diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index f38de7a6ef1..5128f3812ec 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -54,19 +54,6 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur Duration::from_secs(3) }; -/// Interval between two scheduling of indexing plans. No need to be faster than the -/// control plan loop. -// Note: it's currently not possible to define a const duration with -// `CONTROL_PLAN_LOOP_INTERVAL * number`. -pub(crate) const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_secs(3) -} else { - Duration::from_secs(60) -}; - -#[derive(Debug)] -struct RefreshPlanLoop; - #[derive(Debug)] struct ControlPlanLoop; @@ -134,7 +121,15 @@ impl Actor for ControlPlane { .await .context("failed to initialize ingest controller")?; - self.handle(RefreshPlanLoop, ctx).await?; + if let Err(error) = self + .indexing_scheduler + .schedule_indexing_plan_if_needed() + .await + { + // TODO inspect error. + error!("Error when scheduling indexing plan: `{}`.", error); + } + ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) .await; @@ -218,28 +213,6 @@ impl Handler for ControlPlane { } } -#[async_trait] -impl Handler for ControlPlane { - type Reply = (); - - async fn handle( - &mut self, - _message: RefreshPlanLoop, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - if let Err(error) = self - .indexing_scheduler - .schedule_indexing_plan_if_needed() - .await - { - error!("Error when scheduling indexing plan: `{}`.", error); - } - ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop) - .await; - Ok(()) - } -} - #[async_trait] impl Handler for ControlPlane { type Reply = ControlPlaneResult; @@ -331,6 +304,7 @@ impl Handler for ControlPlane { self.ingest_controller .delete_source(&index_uid, &request.source_id); + self.indexing_scheduler.on_index_change().await?; let response = EmptyResponse {}; Ok(Ok(response)) diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index 7ed4f4521b3..9d910457dc9 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -37,7 +37,7 @@ use quickwit_proto::metastore::ListShardsResponse; use quickwit_proto::NodeId; use serde_json::json; -use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL, REFRESH_PLAN_LOOP_INTERVAL}; +use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL}; use crate::scheduler::MIN_DURATION_BETWEEN_SCHEDULING; use crate::IndexerNodeInfo; @@ -270,9 +270,9 @@ async fn test_scheduler_scheduling_no_indexer() { assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); assert!(scheduler_state.last_applied_physical_plan.is_none()); - // Wait REFRESH_PLAN_LOOP_INTERVAL * 2, as there is no indexer, we should observe no + // There is no indexer, we should observe no // scheduling. - universe.sleep(REFRESH_PLAN_LOOP_INTERVAL * 2).await; + universe.sleep(Duration::from_secs(60)).await; let scheduler_state = scheduler_handler .process_pending_and_observe() .await