diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 61876986d99..4807a9dafe9 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -25,10 +25,9 @@ use std::time::Duration; use anyhow::Context; use async_trait::async_trait; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{Future, StreamExt}; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe, - WeakMailbox, + Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox, Supervisor, Universe, WeakMailbox }; use quickwit_cluster::{ ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory, ClusterNode, @@ -97,6 +96,7 @@ pub struct ControlPlane { rebuild_plan_debouncer: Debouncer, } + impl fmt::Debug for ControlPlane { fn fmt(&self, f: &mut Formatter) -> fmt::Result { f.debug_struct("ControlPlane").finish() @@ -165,7 +165,7 @@ impl Actor for ControlPlane { .await .context("failed to initialize the model")?; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); self.ingest_controller.sync_with_all_ingesters(&self.model); @@ -313,9 +313,15 @@ impl ControlPlane { /// /// This method includes some debouncing logic. Every call will be followed by a cooldown /// period. - fn rebuild_plan_debounced(&mut self, ctx: &ActorContext) { + /// + /// This method returns a future that can awaited to ensure that the relevant rebuild plan + /// operation has been executed. + fn rebuild_plan_debounced(&mut self, ctx: &ActorContext) -> impl Future { + let current_generation_processed_waiter = + self.indexing_scheduler.next_rebuild_tracker.current_generation_processed_waiter(); self.rebuild_plan_debouncer .self_send_with_cooldown::(ctx); + current_generation_processed_waiter } } @@ -370,7 +376,7 @@ impl Handler for ControlPlane { ctx.progress(), ) .await?; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); Ok(()) } } @@ -446,17 +452,22 @@ fn convert_metastore_error( // This handler is a metastore call proxied through the control plane: we must first forward the // request to the metastore, and then act on the event. #[async_trait] -impl Handler for ControlPlane { +impl DeferableReplyHandler for ControlPlane { type Reply = ControlPlaneResult; - async fn handle( + + async fn handle_message( &mut self, request: CreateIndexRequest, + reply: impl FnOnce(Self::Reply) + Send + Sync + 'static, ctx: &ActorContext, - ) -> Result { + ) -> Result<(), ActorExitStatus> { let response = match self.metastore.create_index(request).await { Ok(response) => response, - Err(metastore_error) => return convert_metastore_error(metastore_error), + Err(metastore_error) => { + reply(convert_metastore_error(metastore_error)?); + return Ok(()); + }, }; let index_metadata: IndexMetadata = match serde_utils::from_json_str(&response.index_metadata_json) { @@ -472,9 +483,16 @@ impl Handler for ControlPlane { self.model.add_index(index_metadata); if should_rebuild_plan { - self.rebuild_plan_debounced(ctx); + let wait_for_debounced = self.rebuild_plan_debounced(ctx); + tokio::task::spawn(async move { + wait_for_debounced.await; + reply(Ok(response)); + }); + } else { + reply(Ok(response)); } - Ok(Ok(response)) + + Ok(()) } } @@ -508,7 +526,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -544,7 +562,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -572,7 +590,7 @@ impl Handler for ControlPlane { let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?; if mutation_occured { - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); } Ok(Ok(EmptyResponse {})) } @@ -620,7 +638,7 @@ impl Handler for ControlPlane { self.model.delete_source(&source_uid); - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -656,7 +674,7 @@ impl Handler for ControlPlane { return Ok(Err(control_plane_error)); } }; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); Ok(Ok(response)) } } @@ -690,7 +708,7 @@ impl Handler for ControlPlane { self.ingest_controller .handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress()) .await; - self.rebuild_plan_debounced(ctx); + let _ = self.rebuild_plan_debounced(ctx); Ok(Ok(())) } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 474e1ec7344..31261eb7752 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -22,6 +22,7 @@ mod scheduling; use std::cmp::Ordering; use std::fmt; use std::num::NonZeroU32; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; @@ -56,6 +57,64 @@ pub struct IndexingSchedulerState { pub last_applied_plan_timestamp: Option, } + +pub struct ChangeTracker { + generation_processed_tx: Arc>>, + generation_processed_rx: tokio::sync::watch::Receiver, + generation: usize, +} + +impl Default for ChangeTracker { + fn default() -> Self { + let (generation_processed_tx, generation_processed_rx) = tokio::sync::watch::channel(0); + ChangeTracker { + generation_processed_tx: Arc::new(Mutex::new(generation_processed_tx)), + generation_processed_rx, + generation: 1, + } + } +} + +impl ChangeTracker { + pub fn current_generation_processed_waiter(&mut self) -> impl std::future::Future { + let mut generation_processed_rx = self.generation_processed_rx.clone(); + let current_generation = self.generation; + async move { + loop { + if *generation_processed_rx.borrow() >= current_generation { + return; + } + if generation_processed_rx.changed().await.is_err() { + return; + } + } + } + } + + pub fn start_process_generation(&mut self) -> Arc { + let generation = self.generation; + self.generation += 1; + Arc::new(NotifyChangeOnDrop { + generation, + generation_processed_tx: self.generation_processed_tx.clone(), + }) + } +} + +pub struct NotifyChangeOnDrop { + generation: usize, + generation_processed_tx: Arc>>, +} + +impl Drop for NotifyChangeOnDrop { + fn drop(&mut self) { + let generation_processed_tx = self.generation_processed_tx.lock().unwrap(); + if self.generation < *generation_processed_tx.borrow() { + return; + } + let _ = generation_processed_tx.send(self.generation); + } +} /// The [`IndexingScheduler`] is responsible for listing indexing tasks and assiging them to /// indexers. /// We call this duty `scheduling`. Contrary to what the name suggests, most indexing tasks are @@ -100,6 +159,7 @@ pub struct IndexingScheduler { self_node_id: NodeId, indexer_pool: IndexerPool, state: IndexingSchedulerState, + pub(crate) next_rebuild_tracker: ChangeTracker, } impl fmt::Debug for IndexingScheduler { @@ -187,6 +247,7 @@ impl IndexingScheduler { self_node_id, indexer_pool, state: IndexingSchedulerState::default(), + next_rebuild_tracker: ChangeTracker::default(), } } @@ -202,6 +263,8 @@ impl IndexingScheduler { pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) { crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc(); + let notify_on_drop = self.next_rebuild_tracker.start_process_generation(); + let sources = get_sources_to_schedule(model); let indexers: Vec = self.get_indexers_from_indexer_pool(); @@ -239,7 +302,7 @@ impl IndexingScheduler { return; } } - self.apply_physical_indexing_plan(&indexers, new_physical_plan); + self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop)); self.state.num_schedule_indexing_plan += 1; } @@ -283,7 +346,7 @@ impl IndexingScheduler { } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan"); - self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone()); + self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone(), None); } } @@ -295,12 +358,14 @@ impl IndexingScheduler { &mut self, indexers: &[IndexerNodeInfo], new_physical_plan: PhysicalIndexingPlan, + notify_on_drop: Option> ) { debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan"); for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() { // We don't want to block on a slow indexer so we apply this change asynchronously // TODO not blocking is cool, but we need to make sure there is not accumulation // possible here. + let notify_on_drop = notify_on_drop.clone(); tokio::spawn({ let indexer = indexers .iter() @@ -322,6 +387,7 @@ impl IndexingScheduler { "failed to apply indexing plan to indexer" ); } + drop(notify_on_drop); } }); } diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs index 33df248475f..d1150be504f 100644 --- a/quickwit/quickwit-ingest/src/ingest_api_service.rs +++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs @@ -27,7 +27,7 @@ use quickwit_actors::{ }; use quickwit_common::runtimes::RuntimeType; use quickwit_common::tower::Cost; -use tracing::info; +use tracing::{error, info}; use ulid::Ulid; use crate::metrics::INGEST_METRICS; @@ -153,6 +153,7 @@ impl IngestApiService { .find(|index_id| !self.queues.queue_exists(index_id)); if let Some(index_id) = first_non_existing_queue_opt { + error!(index_id=%index_id, "failed to find index"); return Err(IngestServiceError::IndexNotFound { index_id: index_id.to_string(), }); diff --git a/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml index 6d44e41b7dd..d9b3bd54d80 100644 --- a/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/aggregations/_setup.quickwit.yaml @@ -25,7 +25,6 @@ json: - rfc3339 fast_precision: seconds fast: true -sleep_after: 3 --- # Create empty index method: POST @@ -45,12 +44,10 @@ json: - rfc3339 fast_precision: seconds fast: true -sleep_after: 3 --- # Ingest documents method: POST endpoint: aggregations/ingest -num_retries: 10 params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml index 253d011462d..5d06a5a68a5 100644 --- a/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/default_search_fields/_setup.quickwit.yaml @@ -29,7 +29,6 @@ json: --- method: POST endpoint: defaultsearchfields/ingest -num_retries: 10 params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml index 2e227090a9c..49ac827c44f 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml @@ -16,7 +16,6 @@ json: - name: created_at type: datetime fast: true -sleep_after: 3 --- # Create index method: POST @@ -36,7 +35,7 @@ json: indexed: true - name: public type: bool - fast: false + fast: false indexed: true - name: created_at type: datetime @@ -48,14 +47,11 @@ json: fast: normalizer: lowercase record: position -sleep_after: 3 --- # Ingest documents method: POST endpoint: _bulk -num_retries: 10 params: refresh: "true" headers: {"Content-Type": "application/json", "content-encoding": "gzip"} body_from_file: gharchive-bulk.json.gz -sleep_after: 3 diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml index b160d5c2cbc..7962adeb866 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/multi-indices/_setup.quickwit.yaml @@ -62,7 +62,6 @@ json: - name: login type: text fast: true -sleep_after: 6 --- # Ingest documents in index 1 method: POST diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml index de0f7b96b05..8b02ee01882 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml @@ -55,13 +55,11 @@ json: - name: host type: ip fast: true -sleep_after: 3 --- # Ingest documents method: POST api_root: http://localhost:7280/api/v1/ endpoint: fieldcaps/ingest -num_retries: 10 params: commit: force ndjson: @@ -85,7 +83,7 @@ params: commit: force ndjson: - {"mixed": 5, "date": "2023-01-10T12:00:00Z"} # inter split mixed type - - {"mixed": -5.5, "date": "2024-01-10T12:00:00Z"} + - {"mixed": -5.5, "date": "2024-01-10T12:00:00Z"} --- # Ingest documents in index fieldcaps-2 method: POST diff --git a/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml index 70088e26027..1079499d7bb 100644 --- a/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/qw_search_api/_setup.quickwit.yaml @@ -23,12 +23,10 @@ json: tokenizer: default expand_dots: true fast: true -sleep_after: 3 --- # Ingest documents method: POST endpoint: simple/ingest -num_retries: 10 params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml index a0e36e67274..8ba6415c01c 100644 --- a/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/sort_orders/_setup.quickwit.yaml @@ -14,12 +14,10 @@ json: dynamic_mapping: tokenizer: default fast: true -sleep_after: 3 --- # Ingest documents method: POST endpoint: sortorder/ingest -num_retries: 10 params: commit: force ndjson: