From 351fcae472aeee042b7f9079ca9a04add9a68cfc Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 6 Mar 2024 23:04:25 +0900 Subject: [PATCH] Fix race condition that was surfaced due to the debouncing of the control plane build plan. In Ingest v1, queues are created on apply plans. This means that right after creating an index, documents cannot be inserted. This was true before, but it became truely apparent after we added debouncing of control plane apply plans. --- .../src/control_plane.rs | 54 +++++++++----- .../src/indexing_scheduler/mod.rs | 70 ++++++++++++++++++- .../quickwit-ingest/src/ingest_api_service.rs | 3 +- .../aggregations/_setup.quickwit.yaml | 3 - .../_setup.quickwit.yaml | 1 - .../es_compatibility/_setup.quickwit.yaml | 6 +- .../multi-indices/_setup.quickwit.yaml | 1 - .../_setup.quickwit.yaml | 4 +- .../qw_search_api/_setup.quickwit.yaml | 2 - .../scenarii/sort_orders/_setup.quickwit.yaml | 2 - 10 files changed, 108 insertions(+), 38 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 61876986d99..4807a9dafe9 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -25,10 +25,9 @@ use std::time::Duration; use anyhow::Context; use async_trait::async_trait; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{Future, StreamExt}; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe, - WeakMailbox, + Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox, Supervisor, Universe, WeakMailbox }; use quickwit_cluster::{ ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory, ClusterNode, @@ -97,6 +96,7 @@ pub struct ControlPlane { rebuild_plan_debouncer: Debouncer, } + impl fmt::Debug for ControlPlane { fn fmt(&self, f: &mut Formatter) -> fmt::Result { f.debug_struct("ControlPlane").finish() @@ -165,7 +165,7 @@ impl Actor for ControlPlane { .await .context("failed to initialize the model")?; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); self.ingest_controller.sync_with_all_ingesters(&self.model); @@ -313,9 +313,15 @@ impl ControlPlane { /// /// This method includes some debouncing logic. Every call will be followed by a cooldown /// period. - fn rebuild_plan_debounced(&mut self, ctx: &ActorContext) { + /// + /// This method returns a future that can awaited to ensure that the relevant rebuild plan + /// operation has been executed. + fn rebuild_plan_debounced(&mut self, ctx: &ActorContext) -> impl Future { + let current_generation_processed_waiter = + self.indexing_scheduler.next_rebuild_tracker.current_generation_processed_waiter(); self.rebuild_plan_debouncer .self_send_with_cooldown::(ctx); + current_generation_processed_waiter } } @@ -370,7 +376,7 @@ impl Handler for ControlPlane { ctx.progress(), ) .await?; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); Ok(()) } } @@ -446,17 +452,22 @@ fn convert_metastore_error( // This handler is a metastore call proxied through the control plane: we must first forward the // request to the metastore, and then act on the event. #[async_trait] -impl Handler for ControlPlane { +impl DeferableReplyHandler for ControlPlane { type Reply = ControlPlaneResult; - async fn handle( + + async fn handle_message( &mut self, request: CreateIndexRequest, + reply: impl FnOnce(Self::Reply) + Send + Sync + 'static, ctx: &ActorContext, - ) -> Result { + ) -> Result<(), ActorExitStatus> { let response = match self.metastore.create_index(request).await { Ok(response) => response, - Err(metastore_error) => return convert_metastore_error(metastore_error), + Err(metastore_error) => { + reply(convert_metastore_error(metastore_error)?); + return Ok(()); + }, }; let index_metadata: IndexMetadata = match serde_utils::from_json_str(&response.index_metadata_json) { @@ -472,9 +483,16 @@ impl Handler for ControlPlane { self.model.add_index(index_metadata); if should_rebuild_plan { - self.rebuild_plan_debounced(ctx); + let wait_for_debounced = self.rebuild_plan_debounced(ctx); + tokio::task::spawn(async move { + wait_for_debounced.await; + reply(Ok(response)); + }); + } else { + reply(Ok(response)); } - Ok(Ok(response)) + + Ok(()) } } @@ -508,7 +526,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -544,7 +562,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -572,7 +590,7 @@ impl Handler for ControlPlane { let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?; if mutation_occured { - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); } Ok(Ok(EmptyResponse {})) } @@ -620,7 +638,7 @@ impl Handler for ControlPlane { self.model.delete_source(&source_uid); - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -656,7 +674,7 @@ impl Handler for ControlPlane { return Ok(Err(control_plane_error)); } }; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); Ok(Ok(response)) } } @@ -690,7 +708,7 @@ impl Handler for ControlPlane { self.ingest_controller .handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress()) .await; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); Ok(Ok(())) } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 474e1ec7344..31261eb7752 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -22,6 +22,7 @@ mod scheduling; use std::cmp::Ordering; use std::fmt; use std::num::NonZeroU32; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; @@ -56,6 +57,64 @@ pub struct IndexingSchedulerState { pub last_applied_plan_timestamp: Option, } + +pub struct ChangeTracker { + generation_processed_tx: Arc>>, + generation_processed_rx: tokio::sync::watch::Receiver, + generation: usize, +} + +impl Default for ChangeTracker { + fn default() -> Self { + let (generation_processed_tx, generation_processed_rx) = tokio::sync::watch::channel(0); + ChangeTracker { + generation_processed_tx: Arc::new(Mutex::new(generation_processed_tx)), + generation_processed_rx, + generation: 1, + } + } +} + +impl ChangeTracker { + pub fn current_generation_processed_waiter(&mut self) -> impl std::future::Future { + let mut generation_processed_rx = self.generation_processed_rx.clone(); + let current_generation = self.generation; + async move { + loop { + if *generation_processed_rx.borrow() >= current_generation { + return; + } + if generation_processed_rx.changed().await.is_err() { + return; + } + } + } + } + + pub fn start_process_generation(&mut self) -> Arc { + let generation = self.generation; + self.generation += 1; + Arc::new(NotifyChangeOnDrop { + generation, + generation_processed_tx: self.generation_processed_tx.clone(), + }) + } +} + +pub struct NotifyChangeOnDrop { + generation: usize, + generation_processed_tx: Arc>>, +} + +impl Drop for NotifyChangeOnDrop { + fn drop(&mut self) { + let generation_processed_tx = self.generation_processed_tx.lock().unwrap(); + if self.generation < *generation_processed_tx.borrow() { + return; + } + let _ = generation_processed_tx.send(self.generation); + } +} /// The [`IndexingScheduler`] is responsible for listing indexing tasks and assiging them to /// indexers. /// We call this duty `scheduling`. Contrary to what the name suggests, most indexing tasks are @@ -100,6 +159,7 @@ pub struct IndexingScheduler { self_node_id: NodeId, indexer_pool: IndexerPool, state: IndexingSchedulerState, + pub(crate) next_rebuild_tracker: ChangeTracker, } impl fmt::Debug for IndexingScheduler { @@ -187,6 +247,7 @@ impl IndexingScheduler { self_node_id, indexer_pool, state: IndexingSchedulerState::default(), + next_rebuild_tracker: ChangeTracker::default(), } } @@ -202,6 +263,8 @@ impl IndexingScheduler { pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) { crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc(); + let notify_on_drop = self.next_rebuild_tracker.start_process_generation(); + let sources = get_sources_to_schedule(model); let indexers: Vec = self.get_indexers_from_indexer_pool(); @@ -239,7 +302,7 @@ impl IndexingScheduler { return; } } - self.apply_physical_indexing_plan(&indexers, new_physical_plan); + self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop)); self.state.num_schedule_indexing_plan += 1; } @@ -283,7 +346,7 @@ impl IndexingScheduler { } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan"); - self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone()); + self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone(), None); } } @@ -295,12 +358,14 @@ impl IndexingScheduler { &mut self, indexers: &[IndexerNodeInfo], new_physical_plan: PhysicalIndexingPlan, + notify_on_drop: Option> ) { debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan"); for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() { // We don't want to block on a slow indexer so we apply this change asynchronously // TODO not blocking is cool, but we need to make sure there is not accumulation // possible here. + let notify_on_drop = notify_on_drop.clone(); tokio::spawn({ let indexer = indexers .iter() @@ -322,6 +387,7 @@ impl IndexingScheduler { "failed to apply indexing plan to indexer" ); } + drop(notify_on_drop); } }); } diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs index 33df248475f..d1150be504f 100644 --- a/quickwit/quickwit-ingest/src/ingest_api_service.rs +++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs @@ -27,7 +27,7 @@ use quickwit_actors::{ }; use quickwit_common::runtimes::RuntimeType; use quickwit_common::tower::Cost; -use tracing::info; +use tracing::{error, info}; use ulid::Ulid; use crate::metrics::INGEST_METRICS; @@ -153,6 +153,7 @@ impl IngestApiService { .find(|index_id| !self.queues.queue_exists(index_id)); if let Some(index_id) = first_non_existing_queue_opt { + error!(index_id=%index_id, "failed to find index"); return Err(IngestServiceError::IndexNotFound { index_id: index_id.to_string(), }); diff --git a/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml index 6d44e41b7dd..d9b3bd54d80 100644 --- a/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml @@ -25,7 +25,6 @@ json: - rfc3339 fast_precision: seconds fast: true -sleep_after: 3 --- # Create empty index method: POST @@ -45,12 +44,10 @@ json: - rfc3339 fast_precision: seconds fast: true -sleep_after: 3 --- # Ingest documents method: POST endpoint: aggregations/ingest -num_retries: 10 params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml index 253d011462d..5d06a5a68a5 100644 --- a/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml @@ -29,7 +29,6 @@ json: --- method: POST endpoint: defaultsearchfields/ingest -num_retries: 10 params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml index 2e227090a9c..49ac827c44f 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml @@ -16,7 +16,6 @@ json: - name: created_at type: datetime fast: true -sleep_after: 3 --- # Create index method: POST @@ -36,7 +35,7 @@ json: indexed: true - name: public type: bool - fast: false + fast: false indexed: true - name: created_at type: datetime @@ -48,14 +47,11 @@ json: fast: normalizer: lowercase record: position -sleep_after: 3 --- # Ingest documents method: POST endpoint: _bulk -num_retries: 10 params: refresh: "true" headers: {"Content-Type": "application/json", "content-encoding": "gzip"} body_from_file: gharchive-bulk.json.gz -sleep_after: 3 diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml index b160d5c2cbc..7962adeb866 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml @@ -62,7 +62,6 @@ json: - name: login type: text fast: true -sleep_after: 6 --- # Ingest documents in index 1 method: POST diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml index de0f7b96b05..8b02ee01882 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml @@ -55,13 +55,11 @@ json: - name: host type: ip fast: true -sleep_after: 3 --- # Ingest documents method: POST api_root: http://localhost:7280/api/v1/ endpoint: fieldcaps/ingest -num_retries: 10 params: commit: force ndjson: @@ -85,7 +83,7 @@ params: commit: force ndjson: - {"mixed": 5, "date": "2023-01-10T12:00:00Z"} # inter split mixed type - - {"mixed": -5.5, "date": "2024-01-10T12:00:00Z"} + - {"mixed": -5.5, "date": "2024-01-10T12:00:00Z"} --- # Ingest documents in index fieldcaps-2 method: POST diff --git a/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml index 70088e26027..1079499d7bb 100644 --- a/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml @@ -23,12 +23,10 @@ json: tokenizer: default expand_dots: true fast: true -sleep_after: 3 --- # Ingest documents method: POST endpoint: simple/ingest -num_retries: 10 params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml index a0e36e67274..8ba6415c01c 100644 --- a/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml @@ -14,12 +14,10 @@ json: dynamic_mapping: tokenizer: default fast: true -sleep_after: 3 --- # Ingest documents method: POST endpoint: sortorder/ingest -num_retries: 10 params: commit: force ndjson: