diff --git a/quickwit/quickwit-common/src/progress.rs b/quickwit/quickwit-common/src/progress.rs index 01293b3bb90..4d8e759aa03 100644 --- a/quickwit/quickwit-common/src/progress.rs +++ b/quickwit/quickwit-common/src/progress.rs @@ -20,6 +20,8 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use futures::Future; + /// Progress makes it possible to register some progress. /// It is used in lieu of healthcheck. /// @@ -83,6 +85,13 @@ impl Progress { .fetch_max(ProgressState::Updated.into(), Ordering::Relaxed); } + /// Executes a future in a protected zone. + pub async fn protect_future(&self, future: Fut) -> T + where Fut: Future { + let _guard = self.protect_zone(); + future.await + } + pub fn protect_zone(&self) -> ProtectedZoneGuard { loop { let previous_state: ProgressState = self.0.load(Ordering::SeqCst).into(); diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index f2412616a23..f38de7a6ef1 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -18,46 +18,70 @@ // along with this program. If not, see . use std::sync::Arc; +use std::time::Duration; use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, AskError, Handler, Mailbox, Universe, + Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Universe, }; use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_ingest::IngesterPool; use quickwit_metastore::Metastore; use quickwit_proto::control_plane::{ CloseShardsRequest, CloseShardsResponse, ControlPlaneError, ControlPlaneResult, - GetOpenShardsRequest, GetOpenShardsResponse, NotifyIndexChangeRequest, - NotifyIndexChangeResponse, -}; -use quickwit_proto::metastore::events::{ - AddSourceEvent, CreateIndexEvent, DeleteIndexEvent, DeleteSourceEvent, ToggleSourceEvent, + GetOpenShardsRequest, GetOpenShardsResponse, }; +use quickwit_proto::metastore::events::ToggleSourceEvent; use quickwit_proto::metastore::{ serde_utils as metastore_serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest, DeleteSourceRequest, EmptyResponse, ToggleSourceRequest, }; use quickwit_proto::{IndexUid, NodeId}; -use tracing::debug; +use serde::Serialize; +use tracing::error; +use crate::ingest::ingest_controller::IngestControllerState; use crate::ingest::IngestController; -use crate::scheduler::IndexingScheduler; +use crate::scheduler::{IndexingScheduler, IndexingSchedulerState}; use crate::IndexerPool; +/// Interval between two controls (or checks) of the desired plan VS running plan. +pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(500) +} else { + Duration::from_secs(3) +}; + +/// Interval between two scheduling of indexing plans. No need to be faster than the +/// control plan loop. +// Note: it's currently not possible to define a const duration with +// `CONTROL_PLAN_LOOP_INTERVAL * number`. +pub(crate) const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { + Duration::from_secs(3) +} else { + Duration::from_secs(60) +}; + +#[derive(Debug)] +struct RefreshPlanLoop; + +#[derive(Debug)] +struct ControlPlanLoop; + #[derive(Debug)] pub struct ControlPlane { metastore: Arc, - // N.B.: The control plane is an actor and both the indexing scheduler and the ingest - // controller are also actors. It might be simpler to have a single actor for the control - // plane and replace message passing with function calls. The control plane would be in - // charge of driving the control loop of controllers. - indexing_scheduler_mailbox: Mailbox, - indexing_scheduler_handle: ActorHandle, - ingest_controller_mailbox: Mailbox, - ingest_controller_handle: ActorHandle, + // The control plane state is split into to independent functions, that we naturally isolated + // code wise and state wise. + // + // - The indexing scheduler is in charge of managing indexers: it decides which indexer should + // index which source/shards. + // - the ingest controller is in charge of managing ingesters: it opens and closes shards on + // the different ingesters. + indexing_scheduler: IndexingScheduler, + ingest_controller: IngestController, } impl ControlPlane { @@ -72,50 +96,68 @@ impl ControlPlane { ) -> (Mailbox, ActorHandle) { let indexing_scheduler = IndexingScheduler::new(cluster_id, self_node_id, metastore.clone(), indexer_pool); - let (indexing_scheduler_mailbox, indexing_scheduler_handle) = - universe.spawn_builder().spawn(indexing_scheduler); - let ingester_controller = + let ingest_controller = IngestController::new(metastore.clone(), ingester_pool, replication_factor); - let (ingest_controller_mailbox, ingest_controller_handle) = - universe.spawn_builder().spawn(ingester_controller); let control_plane = Self { metastore, - indexing_scheduler_mailbox, - indexing_scheduler_handle, - ingest_controller_mailbox, - ingest_controller_handle, + indexing_scheduler, + ingest_controller, }; universe.spawn_builder().spawn(control_plane) } } +#[derive(Debug, Clone, Serialize, Default)] +pub struct ControlPlaneObservableState { + pub ingester_controller: IngestControllerState, + pub indexing_scheduler: IndexingSchedulerState, +} + #[async_trait] impl Actor for ControlPlane { - type ObservableState = ( - ::ObservableState, - ::ObservableState, - ); + type ObservableState = ControlPlaneObservableState; fn name(&self) -> String { "ControlPlane".to_string() } fn observable_state(&self) -> Self::ObservableState { - ( - self.indexing_scheduler_handle.last_observation(), - self.ingest_controller_handle.last_observation(), - ) + ControlPlaneObservableState { + ingester_controller: self.ingest_controller.observable_state(), + indexing_scheduler: self.indexing_scheduler.observable_state(), + } + } + + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + self.ingest_controller + .load_state(ctx.progress()) + .await + .context("failed to initialize ingest controller")?; + + self.handle(RefreshPlanLoop, ctx).await?; + ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) + .await; + + Ok(()) } } -macro_rules! handle_ask_res { - ($ask_res:expr) => { - match $ask_res { - Ok(response) => response, - Err(AskError::ErrorReply(error)) => return Ok(Err(error)), - Err(error) => return Err(ActorExitStatus::Failure(anyhow::anyhow!(error).into())), +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + _message: ControlPlanLoop, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + if let Err(error) = self.indexing_scheduler.control_running_plan().await { + error!("Error when controlling the running plan: `{}`.", error); } - }; + ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) + .await; + Ok(()) + } } #[async_trait] @@ -125,7 +167,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: CreateIndexRequest, - ctx: &ActorContext, + _ctx: &ActorContext, ) -> Result { let index_config: IndexConfig = match metastore_serde_utils::from_json_str(&request.index_config_json) { @@ -140,16 +182,11 @@ impl Handler for ControlPlane { return Ok(Err(ControlPlaneError::from(error))); } }; - let event = CreateIndexEvent { - index_uid: index_uid.clone(), - }; - handle_ask_res!( - ctx.ask_for_res(&self.ingest_controller_mailbox, event.clone()) - .await - ); + self.ingest_controller.add_index(index_uid.clone()); let response = CreateIndexResponse { index_uid: index_uid.into(), }; + // We do not need to inform the indexing scheduler as there are no shards at this point. Ok(Ok(response)) } } @@ -161,25 +198,48 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: DeleteIndexRequest, - ctx: &ActorContext, + _ctx: &ActorContext, ) -> Result { let index_uid: IndexUid = request.index_uid.into(); if let Err(error) = self.metastore.delete_index(index_uid.clone()).await { return Ok(Err(ControlPlaneError::from(error))); }; - let event = DeleteIndexEvent { index_uid }; - handle_ask_res!( - ctx.ask_for_res(&self.ingest_controller_mailbox, event.clone()) - .await - ); + self.ingest_controller.delete_index(&index_uid); let response = EmptyResponse {}; + + // TODO: Refine the event. Notify index will have the effect to reload the entire state from + // the metastore. We should update the state of the control plane. + self.indexing_scheduler.on_index_change().await?; + Ok(Ok(response)) } } +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + _message: RefreshPlanLoop, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + if let Err(error) = self + .indexing_scheduler + .schedule_indexing_plan_if_needed() + .await + { + error!("Error when scheduling indexing plan: `{}`.", error); + } + ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop) + .await; + Ok(()) + } +} + #[async_trait] impl Handler for ControlPlane { type Reply = ControlPlaneResult; @@ -187,7 +247,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: AddSourceRequest, - ctx: &ActorContext, + _ctx: &ActorContext, ) -> Result { let index_uid: IndexUid = request.index_uid.into(); let source_config: SourceConfig = @@ -198,7 +258,6 @@ impl Handler for ControlPlane { } }; let source_id = source_config.source_id.clone(); - let source_type = source_config.source_type(); if let Err(error) = self .metastore @@ -207,16 +266,13 @@ impl Handler for ControlPlane { { return Ok(Err(ControlPlaneError::from(error))); }; - let event = AddSourceEvent { - index_uid, - source_id, - source_type, - }; - handle_ask_res!( - ctx.ask_for_res(&self.ingest_controller_mailbox, event) - .await - ); - // TODO: Notify indexing controller. + + self.ingest_controller.add_source(&index_uid, &source_id); + + // TODO: Refine the event. Notify index will have the effect to reload the entire state from + // the metastore. We should update the state of the control plane. + self.indexing_scheduler.on_index_change().await?; + let response = EmptyResponse {}; Ok(Ok(response)) } @@ -245,7 +301,10 @@ impl Handler for ControlPlane { source_id: request.source_id, enabled: request.enable, }; - // TODO: Notify indexing controller. + // TODO: Refine the event. Notify index will have the effect to reload the entire state from + // the metastore. We should update the state of the control plane. + self.indexing_scheduler.on_index_change().await?; + let response = EmptyResponse {}; Ok(Ok(response)) } @@ -269,35 +328,15 @@ impl Handler for ControlPlane { { return Ok(Err(ControlPlaneError::from(error))); }; - let _event = DeleteSourceEvent { - index_uid, - source_id: request.source_id, - }; - // TODO: Notify indexing controller. + + self.ingest_controller + .delete_source(&index_uid, &request.source_id); + self.indexing_scheduler.on_index_change().await?; let response = EmptyResponse {}; Ok(Ok(response)) } } -#[async_trait] -impl Handler for ControlPlane { - type Reply = ControlPlaneResult; - - async fn handle( - &mut self, - request: NotifyIndexChangeRequest, - _: &ActorContext, - ) -> Result { - debug!("Index change notification: schedule indexing plan."); - // TODO: Switch from async (`send_message`) to sync (`ask_for_res`). - self.indexing_scheduler_mailbox - .send_message(request) - .await - .context("error sending index change notification to index scheduler")?; - Ok(Ok(NotifyIndexChangeResponse {})) - } -} - #[async_trait] impl Handler for ControlPlane { type Reply = ControlPlaneResult; @@ -305,10 +344,12 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: GetOpenShardsRequest, - _: &ActorContext, + ctx: &ActorContext, ) -> Result { - let response = handle_ask_res!(self.ingest_controller_mailbox.ask_for_res(request).await); - Ok(Ok(response)) + Ok(self + .ingest_controller + .get_open_shards(request, ctx.progress()) + .await) } } @@ -319,10 +360,15 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: CloseShardsRequest, - _: &ActorContext, - ) -> Result { - let response = handle_ask_res!(self.ingest_controller_mailbox.ask_for_res(request).await); - Ok(Ok(response)) + ctx: &ActorContext, + ) -> Result, ActorExitStatus> { + // TODO decide on what the error should be. + let close_shards_resp = self + .ingest_controller + .close_shards(request, ctx.progress()) + .await + .context("Failed to close shards")?; + Ok(Ok(close_shards_resp)) } } diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 1a4afaf580d..f52dc57fdd1 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -25,10 +25,8 @@ use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; -use anyhow::Context; -use async_trait::async_trait; use itertools::Itertools; -use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler}; +use quickwit_common::Progress; use quickwit_config::INGEST_SOURCE_ID; use quickwit_ingest::IngesterPool; use quickwit_metastore::{ListIndexesQuery, Metastore}; @@ -38,14 +36,11 @@ use quickwit_proto::control_plane::{ }; use quickwit_proto::ingest::ingester::{IngesterService, PingRequest}; use quickwit_proto::ingest::{IngestV2Error, Shard, ShardState}; -use quickwit_proto::metastore::events::{ - AddSourceEvent, CreateIndexEvent, DeleteIndexEvent, DeleteSourceEvent, -}; use quickwit_proto::metastore::{EntityKind, MetastoreError}; use quickwit_proto::types::{IndexId, NodeId, SourceId}; use quickwit_proto::{metastore, IndexUid, NodeIdRef, ShardId}; use rand::seq::SliceRandom; -use serde_json::{json, Value as JsonValue}; +use serde::Serialize; use tokio::time::timeout; use tracing::{error, info}; @@ -61,7 +56,7 @@ type NextShardId = ShardId; struct ShardEntry(Shard); impl fmt::Debug for ShardEntry { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("ShardEntry") .field("shard_id", &self.0.shard_id) .field("leader_id", &self.0.leader_id) @@ -126,7 +121,13 @@ struct ShardTable { } impl ShardTable { + fn len(&self) -> usize { + self.table_entries.len() + } + /// Adds a new empty entry for the given index and source. + /// + /// TODO check and document the behavior on error (if the source was already here). fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { let key = (index_uid.clone(), source_id.clone()); let table_entry = ShardTableEntry::default(); @@ -143,6 +144,11 @@ impl ShardTable { } } + fn remove_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + let key = (index_uid.clone(), source_id.clone()); + self.table_entries.remove(&key); + } + /// Clears the shard table. fn clear(&mut self) { self.table_entries.clear(); @@ -214,7 +220,6 @@ impl ShardTable { /// table. fn remove_shards(&mut self, index_uid: &IndexUid, source_id: &SourceId, shard_ids: &[ShardId]) { let key = (index_uid.clone(), source_id.clone()); - if let Some(table_entry) = self.table_entries.get_mut(&key) { table_entry .shard_entries @@ -227,11 +232,6 @@ impl ShardTable { self.table_entries .retain(|(index_uid, _), _| index_uid.index_id() != index_id); } - - fn remove_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { - let key = (index_uid.clone(), source_id.clone()); - self.table_entries.remove(&key); - } } pub struct IngestController { @@ -242,6 +242,18 @@ pub struct IngestController { replication_factor: usize, } +impl fmt::Debug for IngestController { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("IngestController") + .field("replication", &self.metastore) + .field("ingester_pool", &self.ingester_pool) + .field("index_table.len", &self.index_table.len()) + .field("shard_table.len", &self.shard_table.len()) + .field("replication_factor", &self.replication_factor) + .finish() + } +} + impl IngestController { pub fn new( metastore: Arc, @@ -257,14 +269,14 @@ impl IngestController { } } - async fn load_state(&mut self, ctx: &ActorContext) -> ControlPlaneResult<()> { + pub(crate) async fn load_state(&mut self, progress: &Progress) -> ControlPlaneResult<()> { info!("syncing internal state with metastore"); let now = Instant::now(); self.index_table.clear(); self.shard_table.clear(); - let indexes = ctx + let indexes = progress .protect_future(self.metastore.list_indexes_metadatas(ListIndexesQuery::All)) .await?; @@ -294,7 +306,7 @@ impl IngestController { } if !subrequests.is_empty() { let list_shards_request = metastore::ListShardsRequest { subrequests }; - let list_shard_response = ctx + let list_shard_response = progress .protect_future(self.metastore.list_shards(list_shards_request)) .await?; @@ -337,9 +349,9 @@ impl IngestController { /// well. async fn ping_leader_and_follower( &mut self, - ctx: &ActorContext, leader_id: &NodeId, follower_id_opt: Option<&NodeId>, + progress: &Progress, ) -> Result<(), PingError> { let mut leader_ingester = self .ingester_pool @@ -353,7 +365,7 @@ impl IngestController { .cloned() .map(|follower_id| follower_id.into()), }; - ctx.protect_future(timeout( + progress.protect_future(timeout( PING_LEADER_TIMEOUT, leader_ingester.ping(ping_request), )) @@ -374,8 +386,8 @@ impl IngestController { /// 1, only a leader is returned. If no nodes are available, `None` is returned. async fn find_leader_and_follower( &mut self, - ctx: &ActorContext, unavailable_ingesters: &mut HashSet, + progress: &Progress, ) -> Option<(NodeId, Option)> { let mut candidates: Vec = self.ingester_pool.keys().await; candidates.retain(|node_id| !unavailable_ingesters.contains(node_id)); @@ -390,7 +402,7 @@ impl IngestController { continue; } if self - .ping_leader_and_follower(ctx, &leader_id, None) + .ping_leader_and_follower(&leader_id, None, progress) .await .is_ok() { @@ -407,7 +419,7 @@ impl IngestController { continue; } match self - .ping_leader_and_follower(ctx, &leader_id, Some(&follower_id)) + .ping_leader_and_follower(&leader_id, Some(&follower_id), progress) .await { Ok(_) => return Some((leader_id, Some(follower_id))), @@ -430,10 +442,10 @@ impl IngestController { /// ingest router. First, the control plane checks its internal shard table to find /// candidates. If it does not contain any, the control plane will ask /// the metastore to open new shards. - async fn get_open_shards( + pub(crate) async fn get_open_shards( &mut self, - ctx: &ActorContext, get_open_shards_request: GetOpenShardsRequest, + progress: &Progress, ) -> ControlPlaneResult { let mut get_open_shards_subresponses = Vec::with_capacity(get_open_shards_request.subrequests.len()); @@ -480,7 +492,7 @@ impl IngestController { // TODO: Find leaders in batches. // TODO: Round-robin leader-follower pairs or choose according to load. let (leader_id, follower_id) = self - .find_leader_and_follower(ctx, &mut unavailable_ingesters) + .find_leader_and_follower(&mut unavailable_ingesters, progress) .await .ok_or_else(|| { ControlPlaneError::Unavailable("no available ingester".to_string()) @@ -499,7 +511,7 @@ impl IngestController { let open_shards_request = metastore::OpenShardsRequest { subrequests: open_shards_subrequests, }; - let open_shards_response = ctx + let open_shards_response = progress .protect_future(self.metastore.open_shards(open_shards_request)) .await?; for open_shards_subresponse in &open_shards_response.subresponses { @@ -526,10 +538,10 @@ impl IngestController { Ok(get_open_shards_response) } - async fn close_shards( + pub(crate) async fn close_shards( &mut self, - ctx: &ActorContext, close_shards_request: CloseShardsRequest, + progress: &Progress, ) -> ControlPlaneResult { let mut close_shards_subrequests = Vec::with_capacity(close_shards_request.subrequests.len()); @@ -547,7 +559,7 @@ impl IngestController { let metastore_close_shards_request = metastore::CloseShardsRequest { subrequests: close_shards_subrequests, }; - let close_shards_response = ctx + let close_shards_response = progress .protect_future(self.metastore.close_shards(metastore_close_shards_request)) .await?; for close_shards_success in close_shards_response.successes { @@ -569,123 +581,42 @@ enum PingError { FollowerUnavailable, } -#[async_trait] -impl Actor for IngestController { - type ObservableState = JsonValue; - - fn observable_state(&self) -> Self::ObservableState { - json!({ - "num_indexes": self.index_table.len(), - }) - } - - fn name(&self) -> String { - "IngestController".to_string() - } - - async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { - self.load_state(ctx) - .await - .context("failed to initialize ingest controller")?; - Ok(()) - } +#[derive(Debug, Clone, Serialize, Default)] +pub struct IngestControllerState { + pub num_indexes: usize, } -#[async_trait] -impl Handler for IngestController { - type Reply = ControlPlaneResult<()>; - - async fn handle( - &mut self, - event: CreateIndexEvent, - _ctx: &ActorContext, - ) -> Result { - let index_uid = event.index_uid; +impl IngestController { + pub(crate) fn add_index(&mut self, index_uid: IndexUid) { + let index_uid = index_uid; let index_id = index_uid.index_id().to_string(); - self.index_table.insert(index_id, index_uid); - Ok(Ok(())) } -} - -#[async_trait] -impl Handler for IngestController { - type Reply = ControlPlaneResult<()>; - async fn handle( - &mut self, - event: DeleteIndexEvent, - _ctx: &ActorContext, - ) -> Result { + pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { // TODO: We need to let the routers and ingesters know. - self.index_table.remove(event.index_uid.index_id()); - self.shard_table.remove_index(event.index_uid.index_id()); - Ok(Ok(())) - } -} - -#[async_trait] -impl Handler for IngestController { - type Reply = ControlPlaneResult<()>; - - async fn handle( - &mut self, - event: AddSourceEvent, - _ctx: &ActorContext, - ) -> Result { - self.shard_table - .add_source(&event.index_uid, &event.source_id); - Ok(Ok(())) + self.index_table.remove(index_uid.index_id()); + self.shard_table.remove_index(index_uid.index_id()); } -} -#[async_trait] -impl Handler for IngestController { - type Reply = ControlPlaneResult<()>; - - async fn handle( - &mut self, - event: DeleteSourceEvent, - _ctx: &ActorContext, - ) -> Result { - self.shard_table - .remove_source(&event.index_uid, &event.source_id); - Ok(Ok(())) + pub(crate) fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + self.shard_table.add_source(index_uid, source_id); } -} -#[async_trait] -impl Handler for IngestController { - type Reply = ControlPlaneResult; - - async fn handle( - &mut self, - request: GetOpenShardsRequest, - ctx: &ActorContext, - ) -> Result { - let response_res = self.get_open_shards(ctx, request).await; - Ok(response_res) + pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + self.shard_table.remove_source(index_uid, source_id); } -} -#[async_trait] -impl Handler for IngestController { - type Reply = ControlPlaneResult; - - async fn handle( - &mut self, - request: CloseShardsRequest, - ctx: &ActorContext, - ) -> Result { - let response_res = self.close_shards(ctx, request).await; - Ok(response_res) + pub fn observable_state(&self) -> IngestControllerState { + IngestControllerState { + num_indexes: self.index_table.len(), + } } } #[cfg(test)] mod tests { - use quickwit_actors::Universe; use quickwit_config::SourceConfig; use quickwit_metastore::{IndexMetadata, MockMetastore}; use quickwit_proto::control_plane::GetOpenShardsSubrequest; @@ -693,7 +624,6 @@ mod tests { IngesterServiceClient, MockIngesterService, PingResponse, }; use quickwit_proto::ingest::{IngestV2Error, Shard}; - use tokio::sync::watch; use super::*; @@ -894,10 +824,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_load_shard_table() { - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mut mock_metastore = MockMetastore::default(); mock_metastore @@ -952,7 +879,7 @@ mod tests { let mut ingest_controller = IngestController::new(metastore, ingester_pool, replication_factor); - ingest_controller.load_state(&ctx).await.unwrap(); + ingest_controller.load_state(&progress).await.unwrap(); assert_eq!(ingest_controller.index_table.len(), 2); assert_eq!( @@ -990,10 +917,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_ping_leader() { - let universe = Universe::new(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mock_metastore = MockMetastore::default(); let metastore = Arc::new(mock_metastore); @@ -1004,7 +928,7 @@ mod tests { let leader_id: NodeId = "test-ingester-0".into(); let error = ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, None) + .ping_leader_and_follower(&leader_id, None, &progress) .await .unwrap_err(); assert!(matches!(error, PingError::LeaderUnavailable)); @@ -1022,7 +946,7 @@ mod tests { .await; ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, None) + .ping_leader_and_follower(&leader_id, None, &progress) .await .unwrap(); @@ -1042,7 +966,7 @@ mod tests { .await; let error = ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, None) + .ping_leader_and_follower(&leader_id, None, &progress) .await .unwrap_err(); assert!(matches!(error, PingError::LeaderUnavailable)); @@ -1064,7 +988,7 @@ mod tests { let follower_id: NodeId = "test-ingester-1".into(); let error = ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, Some(&follower_id)) + .ping_leader_and_follower(&leader_id, Some(&follower_id), &progress) .await .unwrap_err(); assert!(matches!(error, PingError::FollowerUnavailable)); @@ -1072,10 +996,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_find_leader_replication_factor_1() { - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mock_metastore = MockMetastore::default(); let metastore = Arc::new(mock_metastore); @@ -1085,7 +1006,7 @@ mod tests { IngestController::new(metastore, ingester_pool.clone(), replication_factor); let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1102,7 +1023,7 @@ mod tests { .await; let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1119,7 +1040,7 @@ mod tests { .await; let (leader_id, follower_id) = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await .unwrap(); assert_eq!(leader_id.as_str(), "test-ingester-1"); @@ -1128,10 +1049,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_find_leader_replication_factor_2() { - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mock_metastore = MockMetastore::default(); let metastore = Arc::new(mock_metastore); @@ -1141,7 +1059,7 @@ mod tests { IngestController::new(metastore, ingester_pool.clone(), replication_factor); let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1169,7 +1087,7 @@ mod tests { .await; let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1203,7 +1121,7 @@ mod tests { .await; let (leader_id, follower_id) = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await .unwrap(); assert_eq!(leader_id.as_str(), "test-ingester-0"); @@ -1220,10 +1138,7 @@ mod tests { let source_id = "test-source".to_string(); - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mut mock_metastore = MockMetastore::default(); mock_metastore @@ -1308,7 +1223,7 @@ mod tests { unavailable_ingesters: Vec::new(), }; let response = ingest_controller - .get_open_shards(&ctx, request) + .get_open_shards(request, &progress) .await .unwrap(); assert_eq!(response.subresponses.len(), 0); @@ -1329,7 +1244,7 @@ mod tests { unavailable_ingesters, }; let response = ingest_controller - .get_open_shards(&ctx, request) + .get_open_shards(request, &progress) .await .unwrap(); assert_eq!(response.subresponses.len(), 2); diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index 650204be433..21958f3bb89 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -22,18 +22,8 @@ pub mod indexing_plan; pub mod ingest; pub mod scheduler; -use async_trait::async_trait; -use quickwit_common::pubsub::EventSubscriber; use quickwit_common::tower::Pool; -use quickwit_proto::control_plane::{ - ControlPlaneService, ControlPlaneServiceClient, NotifyIndexChangeRequest, -}; use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask}; -use quickwit_proto::metastore::events::{ - AddSourceEvent, CreateIndexEvent, DeleteIndexEvent, DeleteSourceEvent, ToggleSourceEvent, -}; -use quickwit_proto::metastore::SourceType; -use tracing::error; /// Indexer-node specific information stored in the pool of available indexer nodes #[derive(Debug, Clone)] @@ -44,113 +34,5 @@ pub struct IndexerNodeInfo { pub type IndexerPool = Pool; -/// Subscribes to various metastore events and forwards them to the control plane using the inner -/// client. The actual subscriptions are set up in `quickwit-serve`. -#[derive(Debug, Clone)] -pub struct ControlPlaneEventSubscriber(ControlPlaneServiceClient); - -impl ControlPlaneEventSubscriber { - pub fn new(control_plane: ControlPlaneServiceClient) -> Self { - Self(control_plane) - } - - pub(crate) async fn notify_index_change(&mut self, event_name: &'static str) { - if let Err(error) = self - .0 - .notify_index_change(NotifyIndexChangeRequest {}) - .await - { - error!(error=?error, event=event_name, "failed to notify control plane of index change"); - } - } -} - -/// Notify the control plane when one of the following event occurs: -/// - an index is deleted. -/// - a source, other than the ingest CLI source, is created. -/// - a source is deleted. -/// Note: we don't need to send an event to the control plane on index creation. -/// A new index has no source and thus will not change the scheduling of indexing tasks. -// TODO(fmassot): -// - Forbid a `MetastoreWithControlPlaneTriggers` that wraps a gRPC client metastore. -// - We don't sent any data to the Control Plane. It could be nice to send the relevant data to the -// control plane and let it decide to schedule or not indexing tasks. -#[async_trait] -impl EventSubscriber for ControlPlaneEventSubscriber { - async fn handle_event(&mut self, _event: CreateIndexEvent) { - self.notify_index_change("create-index").await; - } -} - -#[async_trait] -impl EventSubscriber for ControlPlaneEventSubscriber { - async fn handle_event(&mut self, _event: DeleteIndexEvent) { - self.notify_index_change("delete-index").await; - } -} - -#[async_trait] -impl EventSubscriber for ControlPlaneEventSubscriber { - async fn handle_event(&mut self, event: AddSourceEvent) { - if !matches!(event.source_type, SourceType::Cli | SourceType::File) { - self.notify_index_change("add-source").await; - }; - } -} - -#[async_trait] -impl EventSubscriber for ControlPlaneEventSubscriber { - async fn handle_event(&mut self, _event: ToggleSourceEvent) { - self.notify_index_change("toggle-source").await; - } -} - -#[async_trait] -impl EventSubscriber for ControlPlaneEventSubscriber { - async fn handle_event(&mut self, _event: DeleteSourceEvent) { - self.notify_index_change("delete-source").await; - } -} - #[cfg(test)] -mod tests { - - use quickwit_proto::control_plane::NotifyIndexChangeResponse; - use quickwit_proto::metastore::SourceType; - use quickwit_proto::IndexUid; - - use super::*; - - #[tokio::test] - async fn test_metastore_event_handler() { - let mut mock = ControlPlaneServiceClient::mock(); - mock.expect_notify_index_change() - .return_once(|_| Ok(NotifyIndexChangeResponse {})); - - let mut control_plane_event_subscriber = - ControlPlaneEventSubscriber(ControlPlaneServiceClient::new(mock)); - - let index_uid = IndexUid::new("test-index"); - - let event = AddSourceEvent { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - source_type: SourceType::Cli, - }; - control_plane_event_subscriber.handle_event(event).await; - - let event = AddSourceEvent { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - source_type: SourceType::File, - }; - control_plane_event_subscriber.handle_event(event).await; - - let event = AddSourceEvent { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - source_type: SourceType::IngestV2, - }; - control_plane_event_subscriber.handle_event(event).await; - } -} +mod tests; diff --git a/quickwit/quickwit-control-plane/src/scheduler.rs b/quickwit/quickwit-control-plane/src/scheduler.rs index 5b519217ec9..bfe2160c817 100644 --- a/quickwit/quickwit-control-plane/src/scheduler.rs +++ b/quickwit/quickwit-control-plane/src/scheduler.rs @@ -24,14 +24,9 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::Context; -use async_trait::async_trait; use itertools::Itertools; -use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler}; use quickwit_config::{SourceConfig, INGEST_SOURCE_ID}; use quickwit_metastore::{ListIndexesQuery, Metastore}; -use quickwit_proto::control_plane::{ - ControlPlaneResult, NotifyIndexChangeRequest, NotifyIndexChangeResponse, -}; use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask}; use quickwit_proto::metastore::{ListShardsRequest, ListShardsSubrequest}; use quickwit_proto::{NodeId, ShardId}; @@ -43,28 +38,12 @@ use crate::indexing_plan::{ }; use crate::{IndexerNodeInfo, IndexerPool}; -/// Interval between two controls (or checks) of the desired plan VS running plan. -const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_millis(500) -} else { - Duration::from_secs(3) -}; - -/// Interval between two scheduling of indexing plans. No need to be faster than the -/// control plan loop. -// Note: it's currently not possible to define a const duration with -// `CONTROL_PLAN_LOOP_INTERVAL * number`. -const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_secs(3) -} else { - Duration::from_secs(60) -}; - -const MIN_DURATION_BETWEEN_SCHEDULING: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_millis(50) -} else { - Duration::from_secs(30) -}; +pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = + if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(50) + } else { + Duration::from_secs(30) + }; #[derive(Debug, Clone, Default, Serialize)] pub struct IndexingSchedulerState { @@ -88,24 +67,28 @@ pub struct IndexingSchedulerState { /// regularly checks if indexers are effectively running their plans (more details in the next /// section). /// -/// The scheduling is executed when the scheduler receives external or internal events and on -/// certains conditions. The following events possibly trigger a scheduling: -/// - [`NotifyIndexChangeRequest`]: this gRPC event is sent by a metastore node and will trigger a -/// scheduling on each event. TODO(fmassot): this can be refined by adding some relevant info to -/// the event, example: the creation of a source of type `void` should not trigger a scheduling. -/// - [`RefreshPlanLoop`]: this event is scheduled every [`REFRESH_PLAN_LOOP_INTERVAL`] and triggers -/// a scheduling. Due to network issues, a control plane will not always receive the gRPC events -/// [`NotifyIndexChangeRequest`] and thus will not be aware of index changes in the metastore. -/// TODO(fmassot): to avoid a scheduling on each [`RefreshPlanLoop`], we can store in the -/// scheduler state a metastore version number that will be compared to the number stored in the -/// metastore itself. -/// - [`ControlPlanLoop`]: this event is scheduled every [`CONTROL_PLAN_LOOP_INTERVAL`] and control -/// if the `desired plan`, that is the last applied [`PhysicalIndexingPlan`] by the scheduler, and -/// the `running plan`, that is the indexing tasks running on all indexers and retrieved from the -/// chitchat state, are the same: -/// - if node IDs are different, the scheduler will trigger a scheduling. -/// - if indexing tasks are different, the scheduler will apply again the last applied plan. +/// All events altering the list of indexes and sources are proxied through +/// through the control plane. The control plane state is therefore guaranteed to be up-to-date +/// (at the cost of making the control plane a single point of failure). /// +/// They then trigger the production of a new `PhysicalIndexingPlan`. +/// +/// A `ControlPlanLoop` event is scheduled every `CONTROL_PLAN_LOOP_INTERVAL` and steers +/// the cluster toward the last applied [`PhysicalIndexingPlan`]. +/// +/// This physical plan is a desired state. Even after that state is reached, it can be altered due +/// to faulty server for instance. +/// +/// We then need to detect deviation, possibly recompute the desired `PhysicalIndexingPlan` +/// and steer back the cluster to the right state. +/// +/// First to detect deviation, the control plan gathers an eventually consistent view of what is +/// running on the different nodes of the cluster: the `running plan`. This is done via `chitchat`. +/// +/// If the list of node ids has changed, the scheduler will retrigger a scheduling. +/// If the indexing tasks do not match, the scheduler will apply again the last applied plan. +/// Concretely, it will send the faulty nodes of the plan they are supposed to follow. +// /// Finally, in order to give the time for each indexer to run their indexing tasks, the control /// plase will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired /// plan with the running plan. @@ -118,7 +101,7 @@ pub struct IndexingScheduler { } impl fmt::Debug for IndexingScheduler { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("IndexingScheduler") .field("cluster_id", &self.cluster_id) .field("node_id", &self.self_node_id) @@ -131,26 +114,6 @@ impl fmt::Debug for IndexingScheduler { } } -#[async_trait] -impl Actor for IndexingScheduler { - type ObservableState = IndexingSchedulerState; - - fn observable_state(&self) -> Self::ObservableState { - self.state.clone() - } - - fn name(&self) -> String { - "IndexingScheduler".to_string() - } - - async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { - self.handle(RefreshPlanLoop, ctx).await?; - ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) - .await; - Ok(()) - } -} - impl IndexingScheduler { pub fn new( cluster_id: String, @@ -167,7 +130,11 @@ impl IndexingScheduler { } } - async fn schedule_indexing_plan_if_needed(&mut self) -> anyhow::Result<()> { + pub fn observable_state(&self) -> IndexingSchedulerState { + self.state.clone() + } + + pub(crate) async fn schedule_indexing_plan_if_needed(&mut self) -> anyhow::Result<()> { let mut indexers = self.get_indexers_from_indexer_pool().await; if indexers.is_empty() { warn!("No indexer available, cannot schedule an indexing plan."); @@ -283,7 +250,7 @@ impl IndexingScheduler { /// chitchat cluster state. If true, do nothing. /// - If node IDs differ, schedule a new indexing plan. /// - If indexing tasks differ, apply again the last plan. - async fn control_running_plan(&mut self) -> anyhow::Result<()> { + pub(crate) async fn control_running_plan(&mut self) -> anyhow::Result<()> { let last_applied_plan = if let Some(last_applied_plan) = self.state.last_applied_physical_plan.as_ref() { last_applied_plan @@ -363,61 +330,13 @@ impl IndexingScheduler { } } -#[async_trait] -impl Handler for IndexingScheduler { - type Reply = ControlPlaneResult; - - async fn handle( - &mut self, - _: NotifyIndexChangeRequest, - _: &ActorContext, - ) -> Result { - debug!("Index change notification: schedule indexing plan."); +impl IndexingScheduler { + // Should be called whenever a change in the list of index/shard + // has happened + pub(crate) async fn on_index_change(&mut self) -> anyhow::Result<()> { self.schedule_indexing_plan_if_needed() .await .context("error when scheduling indexing plan")?; - Ok(Ok(NotifyIndexChangeResponse {})) - } -} - -#[derive(Debug)] -struct ControlPlanLoop; - -#[async_trait] -impl Handler for IndexingScheduler { - type Reply = (); - - async fn handle( - &mut self, - _message: ControlPlanLoop, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - if let Err(error) = self.control_running_plan().await { - error!("Error when controlling the running plan: `{}`.", error); - } - ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) - .await; - Ok(()) - } -} - -#[derive(Debug)] -struct RefreshPlanLoop; - -#[async_trait] -impl Handler for IndexingScheduler { - type Reply = (); - - async fn handle( - &mut self, - _message: RefreshPlanLoop, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - if let Err(error) = self.schedule_indexing_plan_if_needed().await { - error!("Error when scheduling indexing plan: `{}`.", error); - } - ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop) - .await; Ok(()) } } @@ -583,373 +502,7 @@ fn get_indexing_tasks_diff<'a>( #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; - use std::num::NonZeroUsize; - use std::sync::Arc; - use std::time::Duration; - - use chitchat::transport::ChannelTransport; - use futures::{Stream, StreamExt}; - use quickwit_actors::{ActorHandle, Inbox, Mailbox, Universe}; - use quickwit_cluster::{create_cluster_for_test, Cluster, ClusterChange}; - use quickwit_common::test_utils::wait_until_predicate; - use quickwit_common::tower::{Change, Pool}; - use quickwit_config::service::QuickwitService; - use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_indexing::IndexingService; - use quickwit_metastore::{IndexMetadata, ListIndexesQuery, MockMetastore}; - use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingServiceClient, IndexingTask}; - use quickwit_proto::metastore::ListShardsResponse; - use quickwit_proto::NodeId; - use serde_json::json; - - use super::{IndexingScheduler, CONTROL_PLAN_LOOP_INTERVAL}; - use crate::scheduler::{ - get_indexing_plans_diff, MIN_DURATION_BETWEEN_SCHEDULING, REFRESH_PLAN_LOOP_INTERVAL, - }; - use crate::IndexerNodeInfo; - - fn index_metadata_for_test( - index_id: &str, - source_id: &str, - desired_num_pipelines: usize, - max_num_pipelines_per_indexer: usize, - ) -> IndexMetadata { - let mut index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index"); - let source_config = SourceConfig { - enabled: true, - source_id: source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer) - .unwrap(), - desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), - source_params: SourceParams::Kafka(KafkaSourceParams { - topic: "topic".to_string(), - client_log_level: None, - client_params: json!({ - "bootstrap.servers": "localhost:9092", - }), - enable_backfill_mode: true, - }), - transform_config: None, - input_format: SourceInputFormat::Json, - }; - index_metadata - .sources - .insert(source_id.to_string(), source_config); - index_metadata - } - - pub fn test_indexer_change_stream( - cluster_change_stream: impl Stream + Send + 'static, - indexing_clients: HashMap>, - ) -> impl Stream> + Send + 'static { - cluster_change_stream.filter_map(move |cluster_change| { - let indexing_clients = indexing_clients.clone(); - Box::pin(async move { - match cluster_change { - ClusterChange::Add(node) - if node.enabled_services().contains(&QuickwitService::Indexer) => - { - let node_id = node.node_id().to_string(); - let indexing_tasks = node.indexing_tasks().to_vec(); - let client_mailbox = indexing_clients.get(&node_id).unwrap().clone(); - let client = IndexingServiceClient::from_mailbox(client_mailbox); - Some(Change::Insert( - node_id, - IndexerNodeInfo { - client, - indexing_tasks, - }, - )) - } - ClusterChange::Remove(node) => Some(Change::Remove(node.node_id().to_string())), - _ => None, - } - }) - }) - } - - async fn start_scheduler( - cluster: Cluster, - indexers: &[&Cluster], - universe: &Universe, - ) -> (Vec>, ActorHandle) { - let index_1 = "test-indexing-plan-1"; - let source_1 = "source-1"; - let index_2 = "test-indexing-plan-2"; - let source_2 = "source-2"; - let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2, 2); - let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1); - index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1; - let mut metastore = MockMetastore::default(); - metastore.expect_list_indexes_metadatas().returning( - move |_list_indexes_query: ListIndexesQuery| { - Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()]) - }, - ); - metastore.expect_list_shards().returning(|_| { - Ok(ListShardsResponse { - subresponses: Vec::new(), - }) - }); - let mut indexer_inboxes = Vec::new(); - let indexer_pool = Pool::default(); - let change_stream = cluster.ready_nodes_change_stream().await; - let mut indexing_clients = HashMap::new(); - for indexer in indexers { - let (indexing_service_mailbox, indexing_service_inbox) = universe.create_test_mailbox(); - indexing_clients.insert(indexer.self_node_id().to_string(), indexing_service_mailbox); - indexer_inboxes.push(indexing_service_inbox); - } - let indexer_change_stream = test_indexer_change_stream(change_stream, indexing_clients); - indexer_pool.listen_for_changes(indexer_change_stream); - - let self_node_id: NodeId = cluster.self_node_id().to_string().into(); - let indexing_scheduler = IndexingScheduler::new( - cluster.cluster_id().to_string(), - self_node_id, - Arc::new(metastore), - indexer_pool, - ); - let (_, scheduler_handler) = universe.spawn_builder().spawn(indexing_scheduler); - (indexer_inboxes, scheduler_handler) - } - - #[tokio::test] - async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = - create_cluster_for_test(Vec::new(), &["indexer", "control_plane"], &transport, true) - .await - .unwrap(); - cluster - .wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) - .await - .unwrap(); - let universe = Universe::with_accelerated_time(); - let (indexing_service_inboxes, scheduler_handler) = - start_scheduler(cluster.clone(), &[&cluster.clone()], &universe).await; - let indexing_service_inbox = indexing_service_inboxes[0].clone(); - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - assert!(scheduler_state.last_applied_physical_plan.is_some()); - assert_eq!(indexing_service_inbox_messages.len(), 1); - - // After a CONTROL_PLAN_LOOP_INTERVAL, the control loop will check if the desired plan is - // running on the indexer. As chitchat state of the indexer is not updated (we did - // not instantiate a indexing service for that), the control loop will apply again - // the same plan. - // Check first the plan is not updated before `MIN_DURATION_BETWEEN_SCHEDULING`. - tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.5)).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); - - // After `MIN_DURATION_BETWEEN_SCHEDULING`, we should see a plan update. - tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.7)).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); - assert_eq!(indexing_service_inbox_messages.len(), 1); - let indexing_tasks = indexing_service_inbox_messages - .first() - .unwrap() - .indexing_tasks - .clone(); - - // Update the indexer state and check that the indexer does not receive any new - // `ApplyIndexingPlanRequest`. - cluster - .update_self_node_indexing_tasks(&indexing_tasks) - .await - .unwrap(); - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(indexing_service_inbox_messages.len(), 0); - - // Update the indexer state with a different plan and check that the indexer does now - // receive a new `ApplyIndexingPlanRequest`. - cluster - .update_self_node_indexing_tasks(&[indexing_tasks[0].clone()]) - .await - .unwrap(); - tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(1.2)).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 3); - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(indexing_service_inbox_messages.len(), 1); - universe.assert_quit().await; - } - - #[tokio::test] - async fn test_scheduler_scheduling_no_indexer() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) - .await - .unwrap(); - let universe = Universe::with_accelerated_time(); - let (indexing_service_inboxes, scheduler_handler) = - start_scheduler(cluster.clone(), &[], &universe).await; - assert_eq!(indexing_service_inboxes.len(), 0); - - // No indexer. - universe.sleep(CONTROL_PLAN_LOOP_INTERVAL).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); - - // Wait REFRESH_PLAN_LOOP_INTERVAL * 2, as there is no indexer, we should observe no - // scheduling. - universe.sleep(REFRESH_PLAN_LOOP_INTERVAL * 2).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); - universe.assert_quit().await; - } - - #[tokio::test] - async fn test_scheduler_scheduling_multiple_indexers() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) - .await - .unwrap(); - let cluster_indexer_1 = create_cluster_for_test( - vec![cluster.gossip_advertise_addr().to_string()], - &["indexer"], - &transport, - true, - ) - .await - .unwrap(); - let cluster_indexer_2 = create_cluster_for_test( - vec![cluster.gossip_advertise_addr().to_string()], - &["indexer"], - &transport, - true, - ) - .await - .unwrap(); - let universe = Universe::new(); - let (indexing_service_inboxes, scheduler_handler) = start_scheduler( - cluster.clone(), - &[&cluster_indexer_1, &cluster_indexer_2], - &universe, - ) - .await; - let indexing_service_inbox_1 = indexing_service_inboxes[0].clone(); - let indexing_service_inbox_2 = indexing_service_inboxes[1].clone(); - let scheduler_handler_arc = Arc::new(scheduler_handler); - - // No indexer. - let scheduler_state = scheduler_handler_arc.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox_1.drain_for_test_typed::(); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); - assert_eq!(indexing_service_inbox_messages.len(), 0); - - cluster - .wait_for_ready_members( - |members| { - members - .iter() - .any(|member| member.enabled_services.contains(&QuickwitService::Indexer)) - }, - Duration::from_secs(5), - ) - .await - .unwrap(); - - // Wait for chitchat update, sheduler will detect new indexers and schedule a plan. - wait_until_predicate( - || { - let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); - async move { - let scheduler_state = scheduler_handler_arc_clone - .process_pending_and_observe() - .await; - scheduler_state.num_schedule_indexing_plan == 1 - } - }, - CONTROL_PLAN_LOOP_INTERVAL * 4, - Duration::from_millis(100), - ) - .await - .unwrap(); - let scheduler_state = scheduler_handler_arc.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); - let indexing_service_inbox_messages_1 = - indexing_service_inbox_1.drain_for_test_typed::(); - let indexing_service_inbox_messages_2 = - indexing_service_inbox_2.drain_for_test_typed::(); - assert_eq!(indexing_service_inbox_messages_1.len(), 1); - assert_eq!(indexing_service_inbox_messages_2.len(), 1); - cluster_indexer_1 - .update_self_node_indexing_tasks(&indexing_service_inbox_messages_1[0].indexing_tasks) - .await - .unwrap(); - cluster_indexer_2 - .update_self_node_indexing_tasks(&indexing_service_inbox_messages_2[0].indexing_tasks) - .await - .unwrap(); - - // Wait 2 CONTROL_PLAN_LOOP_INTERVAL again and check the scheduler will not apply the plan - // several times. - universe.sleep(CONTROL_PLAN_LOOP_INTERVAL * 2).await; - let scheduler_state = scheduler_handler_arc.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - - // Shutdown cluster and wait until the new scheduling. - cluster_indexer_2.shutdown().await; - - cluster - .wait_for_ready_members( - |members| { - members - .iter() - .filter(|member| { - member.enabled_services.contains(&QuickwitService::Indexer) - }) - .count() - == 1 - }, - Duration::from_secs(5), - ) - .await - .unwrap(); - - wait_until_predicate( - || { - let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); - async move { - let scheduler_state = scheduler_handler_arc_clone - .process_pending_and_observe() - .await; - scheduler_state.num_schedule_indexing_plan == 2 - } - }, - CONTROL_PLAN_LOOP_INTERVAL * 10, - Duration::from_millis(100), - ) - .await - .unwrap(); - - universe.assert_quit().await; - } + use super::*; #[test] fn test_indexing_plans_diff() { diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs new file mode 100644 index 00000000000..7ed4f4521b3 --- /dev/null +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -0,0 +1,431 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::time::Duration; + +use chitchat::transport::ChannelTransport; +use futures::{Stream, StreamExt}; +use quickwit_actors::{ActorHandle, Inbox, Mailbox, Universe}; +use quickwit_cluster::{create_cluster_for_test, Cluster, ClusterChange}; +use quickwit_common::test_utils::wait_until_predicate; +use quickwit_common::tower::{Change, Pool}; +use quickwit_config::service::QuickwitService; +use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams}; +use quickwit_indexing::IndexingService; +use quickwit_metastore::{IndexMetadata, ListIndexesQuery, MockMetastore}; +use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingServiceClient}; +use quickwit_proto::metastore::ListShardsResponse; +use quickwit_proto::NodeId; +use serde_json::json; + +use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL, REFRESH_PLAN_LOOP_INTERVAL}; +use crate::scheduler::MIN_DURATION_BETWEEN_SCHEDULING; +use crate::IndexerNodeInfo; + +fn index_metadata_for_test( + index_id: &str, + source_id: &str, + desired_num_pipelines: usize, + max_num_pipelines_per_indexer: usize, +) -> IndexMetadata { + let mut index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index"); + let source_config = SourceConfig { + enabled: true, + source_id: source_id.to_string(), + max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer).unwrap(), + desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), + source_params: SourceParams::Kafka(KafkaSourceParams { + topic: "topic".to_string(), + client_log_level: None, + client_params: json!({ + "bootstrap.servers": "localhost:9092", + }), + enable_backfill_mode: true, + }), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + index_metadata + .sources + .insert(source_id.to_string(), source_config); + index_metadata +} + +pub fn test_indexer_change_stream( + cluster_change_stream: impl Stream + Send + 'static, + indexing_clients: HashMap>, +) -> impl Stream> + Send + 'static { + cluster_change_stream.filter_map(move |cluster_change| { + let indexing_clients = indexing_clients.clone(); + Box::pin(async move { + match cluster_change { + ClusterChange::Add(node) + if node.enabled_services().contains(&QuickwitService::Indexer) => + { + let node_id = node.node_id().to_string(); + let indexing_tasks = node.indexing_tasks().to_vec(); + let client_mailbox = indexing_clients.get(&node_id).unwrap().clone(); + let client = IndexingServiceClient::from_mailbox(client_mailbox); + Some(Change::Insert( + node_id, + IndexerNodeInfo { + client, + indexing_tasks, + }, + )) + } + ClusterChange::Remove(node) => Some(Change::Remove(node.node_id().to_string())), + _ => None, + } + }) + }) +} + +async fn start_control_plane( + cluster: Cluster, + indexers: &[&Cluster], + universe: &Universe, +) -> (Vec>, ActorHandle) { + let index_1 = "test-indexing-plan-1"; + let source_1 = "source-1"; + let index_2 = "test-indexing-plan-2"; + let source_2 = "source-2"; + let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2, 2); + let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1); + index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1; + let mut metastore = MockMetastore::default(); + metastore.expect_list_indexes_metadatas().returning( + move |_list_indexes_query: ListIndexesQuery| { + Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()]) + }, + ); + metastore.expect_list_shards().returning(|_| { + Ok(ListShardsResponse { + subresponses: Vec::new(), + }) + }); + let mut indexer_inboxes = Vec::new(); + + let indexer_pool = Pool::default(); + let ingester_pool = Pool::default(); + let change_stream = cluster.ready_nodes_change_stream().await; + let mut indexing_clients = HashMap::new(); + + for indexer in indexers { + let (indexing_service_mailbox, indexing_service_inbox) = universe.create_test_mailbox(); + indexing_clients.insert(indexer.self_node_id().to_string(), indexing_service_mailbox); + indexer_inboxes.push(indexing_service_inbox); + } + let indexer_change_stream = test_indexer_change_stream(change_stream, indexing_clients); + indexer_pool.listen_for_changes(indexer_change_stream); + + let self_node_id: NodeId = cluster.self_node_id().to_string().into(); + let (_, control_plane_handle) = ControlPlane::spawn( + universe, + cluster.cluster_id().to_string(), + self_node_id, + indexer_pool, + ingester_pool, + Arc::new(metastore), + 1, + ); + + (indexer_inboxes, control_plane_handle) +} + +#[tokio::test] +async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = + create_cluster_for_test(Vec::new(), &["indexer", "control_plane"], &transport, true) + .await + .unwrap(); + cluster + .wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) + .await + .unwrap(); + let universe = Universe::with_accelerated_time(); + let (indexing_service_inboxes, control_plane_handler) = + start_control_plane(cluster.clone(), &[&cluster.clone()], &universe).await; + let indexing_service_inbox = indexing_service_inboxes[0].clone(); + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + assert!(scheduler_state.last_applied_physical_plan.is_some()); + assert_eq!(indexing_service_inbox_messages.len(), 1); + + // After a CONTROL_PLAN_LOOP_INTERVAL, the control loop will check if the desired plan is + // running on the indexer. As chitchat state of the indexer is not updated (we did + // not instantiate a indexing service for that), the control loop will apply again + // the same plan. + // Check first the plan is not updated before `MIN_DURATION_BETWEEN_SCHEDULING`. + tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.5)).await; + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); + + // After `MIN_DURATION_BETWEEN_SCHEDULING`, we should see a plan update. + tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.7)).await; + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); + assert_eq!(indexing_service_inbox_messages.len(), 1); + let indexing_tasks = indexing_service_inbox_messages + .first() + .unwrap() + .indexing_tasks + .clone(); + + // Update the indexer state and check that the indexer does not receive any new + // `ApplyIndexingPlanRequest`. + cluster + .update_self_node_indexing_tasks(&indexing_tasks) + .await + .unwrap(); + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(indexing_service_inbox_messages.len(), 0); + + // Update the indexer state with a different plan and check that the indexer does now + // receive a new `ApplyIndexingPlanRequest`. + cluster + .update_self_node_indexing_tasks(&[indexing_tasks[0].clone()]) + .await + .unwrap(); + tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(1.2)).await; + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 3); + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(indexing_service_inbox_messages.len(), 1); + universe.assert_quit().await; +} + +#[tokio::test] +async fn test_scheduler_scheduling_no_indexer() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) + .await + .unwrap(); + let universe = Universe::with_accelerated_time(); + let (indexing_service_inboxes, scheduler_handler) = + start_control_plane(cluster.clone(), &[], &universe).await; + assert_eq!(indexing_service_inboxes.len(), 0); + + // No indexer. + universe.sleep(CONTROL_PLAN_LOOP_INTERVAL).await; + let scheduler_state = scheduler_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); + assert!(scheduler_state.last_applied_physical_plan.is_none()); + + // Wait REFRESH_PLAN_LOOP_INTERVAL * 2, as there is no indexer, we should observe no + // scheduling. + universe.sleep(REFRESH_PLAN_LOOP_INTERVAL * 2).await; + let scheduler_state = scheduler_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); + assert!(scheduler_state.last_applied_physical_plan.is_none()); + universe.assert_quit().await; +} + +#[tokio::test] +async fn test_scheduler_scheduling_multiple_indexers() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) + .await + .unwrap(); + let cluster_indexer_1 = create_cluster_for_test( + vec![cluster.gossip_advertise_addr().to_string()], + &["indexer"], + &transport, + true, + ) + .await + .unwrap(); + let cluster_indexer_2 = create_cluster_for_test( + vec![cluster.gossip_advertise_addr().to_string()], + &["indexer"], + &transport, + true, + ) + .await + .unwrap(); + let universe = Universe::new(); + let (indexing_service_inboxes, scheduler_handler) = start_control_plane( + cluster.clone(), + &[&cluster_indexer_1, &cluster_indexer_2], + &universe, + ) + .await; + let indexing_service_inbox_1 = indexing_service_inboxes[0].clone(); + let indexing_service_inbox_2 = indexing_service_inboxes[1].clone(); + let scheduler_handler_arc = Arc::new(scheduler_handler); + + // No indexer. + let scheduler_state = scheduler_handler_arc + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + let indexing_service_inbox_messages = + indexing_service_inbox_1.drain_for_test_typed::(); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); + assert!(scheduler_state.last_applied_physical_plan.is_none()); + assert_eq!(indexing_service_inbox_messages.len(), 0); + + cluster + .wait_for_ready_members( + |members| { + members + .iter() + .any(|member| member.enabled_services.contains(&QuickwitService::Indexer)) + }, + Duration::from_secs(5), + ) + .await + .unwrap(); + + // Wait for chitchat update, sheduler will detect new indexers and schedule a plan. + wait_until_predicate( + || { + let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); + async move { + let scheduler_state = scheduler_handler_arc_clone + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + scheduler_state.num_schedule_indexing_plan == 1 + } + }, + CONTROL_PLAN_LOOP_INTERVAL * 4, + Duration::from_millis(100), + ) + .await + .unwrap(); + let scheduler_state = scheduler_handler_arc + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); + let indexing_service_inbox_messages_1 = + indexing_service_inbox_1.drain_for_test_typed::(); + let indexing_service_inbox_messages_2 = + indexing_service_inbox_2.drain_for_test_typed::(); + assert_eq!(indexing_service_inbox_messages_1.len(), 1); + assert_eq!(indexing_service_inbox_messages_2.len(), 1); + cluster_indexer_1 + .update_self_node_indexing_tasks(&indexing_service_inbox_messages_1[0].indexing_tasks) + .await + .unwrap(); + cluster_indexer_2 + .update_self_node_indexing_tasks(&indexing_service_inbox_messages_2[0].indexing_tasks) + .await + .unwrap(); + + // Wait 2 CONTROL_PLAN_LOOP_INTERVAL again and check the scheduler will not apply the plan + // several times. + universe.sleep(CONTROL_PLAN_LOOP_INTERVAL * 2).await; + let scheduler_state = scheduler_handler_arc + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + + // Shutdown cluster and wait until the new scheduling. + cluster_indexer_2.shutdown().await; + + cluster + .wait_for_ready_members( + |members| { + members + .iter() + .filter(|member| member.enabled_services.contains(&QuickwitService::Indexer)) + .count() + == 1 + }, + Duration::from_secs(5), + ) + .await + .unwrap(); + + wait_until_predicate( + || { + let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); + async move { + let scheduler_state = scheduler_handler_arc_clone + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + scheduler_state.num_schedule_indexing_plan == 2 + } + }, + CONTROL_PLAN_LOOP_INTERVAL * 10, + Duration::from_millis(100), + ) + .await + .unwrap(); + + universe.assert_quit().await; +} diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index d790ed9c8fe..b1679280e21 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -44,7 +44,7 @@ pub struct ControlPlaneMetastore { } impl fmt::Debug for ControlPlaneMetastore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("ControlPlaneMetastore").finish() } } diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index 38d30d084f5..7d6c7ca25ba 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -58,21 +58,8 @@ service ControlPlaneService { rpc GetOpenShards(GetOpenShardsRequest) returns (GetOpenShardsResponse); rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse); - - // Notify the Control Plane that a change on an index occurred. The change - // can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - // Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - // This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - // type of change. The index ID and/or source ID could also be added. - // However, these attributes will not be used by the Control Plane, at least at short term. - rpc NotifyIndexChange(NotifyIndexChangeRequest) returns (NotifyIndexChangeResponse); - } -message NotifyIndexChangeRequest {} - -message NotifyIndexChangeResponse {} - // Shard API message GetOpenShardsRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index d51f4844745..d1d9e815561 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -1,14 +1,6 @@ #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct NotifyIndexChangeRequest {} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct NotifyIndexChangeResponse {} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct GetOpenShardsRequest { #[prost(message, repeated, tag = "1")] pub subrequests: ::prost::alloc::vec::Vec, @@ -109,16 +101,6 @@ pub trait ControlPlaneService: std::fmt::Debug + dyn_clone::DynClone + Send + Sy &mut self, request: CloseShardsRequest, ) -> crate::control_plane::ControlPlaneResult; - /// Notify the Control Plane that a change on an index occurred. The change - /// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - /// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - /// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - /// type of change. The index ID and/or source ID could also be added. - /// However, these attributes will not be used by the Control Plane, at least at short term. - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult; } dyn_clone::clone_trait_object!(ControlPlaneService); #[cfg(any(test, feature = "testsuite"))] @@ -231,12 +213,6 @@ impl ControlPlaneService for ControlPlaneServiceClient { ) -> crate::control_plane::ControlPlaneResult { self.inner.close_shards(request).await } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner.notify_index_change(request).await - } } #[cfg(any(test, feature = "testsuite"))] pub mod control_plane_service_mock { @@ -299,12 +275,6 @@ pub mod control_plane_service_mock { ) -> crate::control_plane::ControlPlaneResult { self.inner.lock().await.close_shards(request).await } - async fn notify_index_change( - &mut self, - request: super::NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner.lock().await.notify_index_change(request).await - } } impl From for ControlPlaneServiceClient { fn from(mock: MockControlPlaneService) -> Self { @@ -435,22 +405,6 @@ impl tower::Service for Box { Box::pin(fut) } } -impl tower::Service for Box { - type Response = NotifyIndexChangeResponse; - type Error = crate::control_plane::ControlPlaneError; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } - fn call(&mut self, request: NotifyIndexChangeRequest) -> Self::Future { - let mut svc = self.clone(); - let fut = async move { svc.notify_index_change(request).await }; - Box::pin(fut) - } -} /// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. #[derive(Debug)] struct ControlPlaneServiceTowerBlock { @@ -489,11 +443,6 @@ struct ControlPlaneServiceTowerBlock { CloseShardsResponse, crate::control_plane::ControlPlaneError, >, - notify_index_change_svc: quickwit_common::tower::BoxService< - NotifyIndexChangeRequest, - NotifyIndexChangeResponse, - crate::control_plane::ControlPlaneError, - >, } impl Clone for ControlPlaneServiceTowerBlock { fn clone(&self) -> Self { @@ -505,7 +454,6 @@ impl Clone for ControlPlaneServiceTowerBlock { delete_source_svc: self.delete_source_svc.clone(), get_open_shards_svc: self.get_open_shards_svc.clone(), close_shards_svc: self.close_shards_svc.clone(), - notify_index_change_svc: self.notify_index_change_svc.clone(), } } } @@ -555,12 +503,6 @@ impl ControlPlaneService for ControlPlaneServiceTowerBlock { ) -> crate::control_plane::ControlPlaneResult { self.close_shards_svc.ready().await?.call(request).await } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.notify_index_change_svc.ready().await?.call(request).await - } } #[derive(Debug, Default)] pub struct ControlPlaneServiceTowerBlockBuilder { @@ -627,15 +569,6 @@ pub struct ControlPlaneServiceTowerBlockBuilder { crate::control_plane::ControlPlaneError, >, >, - #[allow(clippy::type_complexity)] - notify_index_change_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - NotifyIndexChangeRequest, - NotifyIndexChangeResponse, - crate::control_plane::ControlPlaneError, - >, - >, } impl ControlPlaneServiceTowerBlockBuilder { pub fn shared_layer(mut self, layer: L) -> Self @@ -693,12 +626,6 @@ impl ControlPlaneServiceTowerBlockBuilder { Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, - L::Service: tower::Service< - NotifyIndexChangeRequest, - Response = NotifyIndexChangeResponse, - Error = crate::control_plane::ControlPlaneError, - > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, { self .create_index_layer = Some( @@ -724,14 +651,7 @@ impl ControlPlaneServiceTowerBlockBuilder { .get_open_shards_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); - self - .close_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .notify_index_change_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.close_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn create_index_layer(mut self, layer: L) -> Self @@ -835,22 +755,6 @@ impl ControlPlaneServiceTowerBlockBuilder { self.close_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn notify_index_change_layer(mut self, layer: L) -> Self - where - L: tower::Layer> + Send + Sync + 'static, - L::Service: tower::Service< - NotifyIndexChangeRequest, - Response = NotifyIndexChangeResponse, - Error = crate::control_plane::ControlPlaneError, - > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - { - self - .notify_index_change_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); - self - } pub fn build(self, instance: T) -> ControlPlaneServiceClient where T: ControlPlaneService, @@ -924,12 +828,6 @@ impl ControlPlaneServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; - let notify_index_change_svc = if let Some(layer) = self.notify_index_change_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; let tower_block = ControlPlaneServiceTowerBlock { create_index_svc, delete_index_svc, @@ -938,7 +836,6 @@ impl ControlPlaneServiceTowerBlockBuilder { delete_source_svc, get_open_shards_svc, close_shards_svc, - notify_index_change_svc, }; ControlPlaneServiceClient::new(tower_block) } @@ -1077,15 +974,6 @@ where CloseShardsResponse, crate::control_plane::ControlPlaneError, >, - > - + tower::Service< - NotifyIndexChangeRequest, - Response = NotifyIndexChangeResponse, - Error = crate::control_plane::ControlPlaneError, - Future = BoxFuture< - NotifyIndexChangeResponse, - crate::control_plane::ControlPlaneError, - >, >, { async fn create_index( @@ -1132,12 +1020,6 @@ where ) -> crate::control_plane::ControlPlaneResult { self.call(request).await } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.call(request).await - } } #[derive(Debug, Clone)] pub struct ControlPlaneServiceGrpcClientAdapter { @@ -1233,16 +1115,6 @@ where .map(|response| response.into_inner()) .map_err(|error| error.into()) } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner - .notify_index_change(request) - .await - .map(|response| response.into_inner()) - .map_err(|error| error.into()) - } } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServerAdapter { @@ -1336,17 +1208,6 @@ for ControlPlaneServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(|error| error.into()) } - async fn notify_index_change( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - self.inner - .clone() - .notify_index_change(request.into_inner()) - .await - .map(tonic::Response::new) - .map_err(|error| error.into()) - } } /// Generated client implementations. pub mod control_plane_service_grpc_client { @@ -1656,42 +1517,6 @@ pub mod control_plane_service_grpc_client { ); self.inner.unary(req, path, codec).await } - /// Notify the Control Plane that a change on an index occurred. The change - /// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - /// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - /// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - /// type of change. The index ID and/or source ID could also be added. - /// However, these attributes will not be used by the Control Plane, at least at short term. - pub async fn notify_index_change( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.control_plane.ControlPlaneService/NotifyIndexChange", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "quickwit.control_plane.ControlPlaneService", - "NotifyIndexChange", - ), - ); - self.inner.unary(req, path, codec).await - } } } /// Generated server implementations. @@ -1757,19 +1582,6 @@ pub mod control_plane_service_grpc_server { tonic::Response, tonic::Status, >; - /// Notify the Control Plane that a change on an index occurred. The change - /// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - /// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - /// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - /// type of change. The index ID and/or source ID could also be added. - /// However, these attributes will not be used by the Control Plane, at least at short term. - async fn notify_index_change( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServer { @@ -2186,52 +1998,6 @@ pub mod control_plane_service_grpc_server { }; Box::pin(fut) } - "/quickwit.control_plane.ControlPlaneService/NotifyIndexChange" => { - #[allow(non_camel_case_types)] - struct NotifyIndexChangeSvc(pub Arc); - impl< - T: ControlPlaneServiceGrpc, - > tonic::server::UnaryService - for NotifyIndexChangeSvc { - type Response = super::NotifyIndexChangeResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - (*inner).notify_index_change(request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = NotifyIndexChangeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-proto/src/metastore/events.rs b/quickwit/quickwit-proto/src/metastore/events.rs index b3ec894db10..94c2b1fe263 100644 --- a/quickwit/quickwit-proto/src/metastore/events.rs +++ b/quickwit/quickwit-proto/src/metastore/events.rs @@ -22,12 +22,6 @@ use quickwit_common::pubsub::Event; use super::SourceType; use crate::{IndexUid, SourceId}; -/// Create index event. -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct CreateIndexEvent { - pub index_uid: IndexUid, -} - /// Delete index event. #[derive(Debug, Clone, Eq, PartialEq)] pub struct DeleteIndexEvent { @@ -67,7 +61,6 @@ pub struct DeleteSourceEvent { } impl Event for AddSourceEvent {} -impl Event for CreateIndexEvent {} impl Event for DeleteIndexEvent {} impl Event for DeleteSourceEvent {} impl Event for ToggleSourceEvent {} diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 89a60103880..f719d409df8 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -63,7 +63,7 @@ use quickwit_common::tower::{ use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; use quickwit_control_plane::control_plane::ControlPlane; -use quickwit_control_plane::{ControlPlaneEventSubscriber, IndexerNodeInfo, IndexerPool}; +use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; use quickwit_indexing::actors::IndexingService; use quickwit_indexing::start_indexing_service; @@ -81,9 +81,6 @@ use quickwit_proto::control_plane::ControlPlaneServiceClient; use quickwit_proto::indexing::IndexingServiceClient; use quickwit_proto::ingest::ingester::IngesterServiceClient; use quickwit_proto::ingest::router::IngestRouterServiceClient; -use quickwit_proto::metastore::events::{ - AddSourceEvent, DeleteIndexEvent, DeleteSourceEvent, ToggleSourceEvent, -}; use quickwit_proto::metastore::{EntityKind, MetastoreError}; use quickwit_proto::search::ReportSplitsRequest; use quickwit_proto::NodeId; @@ -134,7 +131,6 @@ struct QuickwitServices { /// The control plane listens to metastore events. /// We must maintain a reference to the subscription handles to continue receiving /// notifications. Otherwise, the subscriptions are dropped. - _control_plane_event_subscription_handles_opt: Option, _report_splits_subscription_handle_opt: Option>, } @@ -312,13 +308,6 @@ pub async fn serve_quickwit( ) .await?; - // Setup control plane event subscriptions. - let control_plane_event_subscription_handles_opt = setup_control_plane_event_subscriptions( - &node_config, - &event_broker, - &control_plane_service, - ); - // Set up the "control plane proxy" for the metastore. let metastore_through_control_plane: Arc = Arc::new(ControlPlaneMetastore::new( control_plane_service.clone(), @@ -457,7 +446,6 @@ pub async fn serve_quickwit( metastore_server_opt, metastore_client: metastore_through_control_plane.clone(), control_plane_service, - _control_plane_event_subscription_handles_opt: control_plane_event_subscription_handles_opt, _report_splits_subscription_handle_opt: report_splits_subscription_handle_opt, index_manager, indexing_service_opt, @@ -541,43 +529,6 @@ pub async fn serve_quickwit( Ok(actor_exit_statuses) } -#[allow(dead_code)] -#[derive(Debug)] -struct ControlPlaneEventSubscriptionHandles { - delete_index_event_subscription_handle: EventSubscriptionHandle, - add_source_event_subscription_handle: EventSubscriptionHandle, - toggle_source_event_subscription_handle: EventSubscriptionHandle, - delete_source_event_subscription_handle: EventSubscriptionHandle, -} - -fn setup_control_plane_event_subscriptions( - config: &NodeConfig, - event_broker: &EventBroker, - control_plane_service: &ControlPlaneServiceClient, -) -> Option { - if !config.is_service_enabled(QuickwitService::Metastore) { - return None; - } - let control_plane_event_subscriber = - ControlPlaneEventSubscriber::new(control_plane_service.clone()); - - let delete_index_event_subscription_handle = - event_broker.subscribe::(control_plane_event_subscriber.clone()); - let add_source_event_subscription_handle = - event_broker.subscribe::(control_plane_event_subscriber.clone()); - let toggle_source_event_subscription_handle = - event_broker.subscribe::(control_plane_event_subscriber.clone()); - let delete_source_event_subscription_handle = - event_broker.subscribe::(control_plane_event_subscriber); - let control_plane_subscription_handles = ControlPlaneEventSubscriptionHandles { - delete_index_event_subscription_handle, - add_source_event_subscription_handle, - toggle_source_event_subscription_handle, - delete_source_event_subscription_handle, - }; - Some(control_plane_subscription_handles) -} - async fn setup_ingest_v2( config: &NodeConfig, cluster: &Cluster,