diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index ea16e4ef796..9ff1a246f6b 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -43,6 +43,7 @@ use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMet use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest, + RebuildPlanRequest, RebuildPlanResponse, }; use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::metastore::{ @@ -77,9 +78,6 @@ const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2); #[derive(Debug)] struct ControlPlanLoop; -#[derive(Debug, Default)] -struct RebuildPlan; - pub struct ControlPlane { cluster_config: ClusterConfig, cluster_change_stream_opt: Option, @@ -384,22 +382,24 @@ impl ControlPlane { .next_rebuild_tracker .next_rebuild_waiter(); self.rebuild_plan_debouncer - .self_send_with_cooldown::(ctx); + .self_send_with_cooldown::(ctx); next_rebuild_waiter } } #[async_trait] -impl Handler for ControlPlane { - type Reply = (); +impl Handler for ControlPlane { + type Reply = ControlPlaneResult; async fn handle( &mut self, - _message: RebuildPlan, + rebuild_plan_request: RebuildPlanRequest, _ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - self.indexing_scheduler.rebuild_plan(&self.model); - Ok(()) + ) -> Result, ActorExitStatus> { + let rebuild_plan_response = self + .indexing_scheduler + .rebuild_plan(&self.model, rebuild_plan_request); + Ok(Ok(rebuild_plan_response)) } } @@ -925,7 +925,8 @@ impl Handler for ControlPlane { { return convert_metastore_error::<()>(metastore_error).map(|_| ()); } - self.indexing_scheduler.rebuild_plan(&self.model); + self.indexing_scheduler + .rebuild_plan(&self.model, RebuildPlanRequest::default()); Ok(()) } } @@ -955,7 +956,8 @@ impl Handler for ControlPlane { { return convert_metastore_error::<()>(metastore_error).map(|_| ()); } - self.indexing_scheduler.rebuild_plan(&self.model); + self.indexing_scheduler + .rebuild_plan(&self.model, RebuildPlanRequest::default()); Ok(()) } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 0fc16189925..06476b692c9 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -28,6 +28,7 @@ use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; +use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse}; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, }; @@ -205,7 +206,12 @@ impl IndexingScheduler { // // Prefer not calling this method directly, and instead call // `ControlPlane::rebuild_indexing_plan_debounced`. - pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) { + pub(crate) fn rebuild_plan( + &mut self, + model: &ControlPlaneModel, + rebuild_plan_request: RebuildPlanRequest, + ) -> RebuildPlanResponse { + let RebuildPlanRequest { reset, debug } = rebuild_plan_request; crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc(); let notify_on_drop = self.next_rebuild_tracker.start_rebuild(); @@ -214,6 +220,8 @@ impl IndexingScheduler { let indexers: Vec = self.get_indexers_from_indexer_pool(); + let mut rebuild_plan_response = RebuildPlanResponse::default(); + let indexer_id_to_cpu_capacities: FnvHashMap = indexers .iter() .filter_map(|indexer| { @@ -229,15 +237,28 @@ impl IndexingScheduler { if !sources.is_empty() { warn!("no indexing capacity available, cannot schedule an indexing plan"); } - return; + return rebuild_plan_response; }; let shard_locations = model.shard_locations(); + let previous_applied_plan = if reset { + None + } else { + self.state.last_applied_physical_plan.as_ref() + }; + + let debug_output: Option<&mut RebuildPlanResponse> = if debug { + Some(&mut rebuild_plan_response) + } else { + None + }; + let new_physical_plan = build_physical_indexing_plan( &sources, &indexer_id_to_cpu_capacities, - self.state.last_applied_physical_plan.as_ref(), + previous_applied_plan, &shard_locations, + debug_output, ); let shard_locality_metrics = get_shard_locality_metrics(&new_physical_plan, &shard_locations); @@ -249,11 +270,12 @@ impl IndexingScheduler { ); // No need to apply the new plan as it is the same as the old one. if plans_diff.is_empty() { - return; + return rebuild_plan_response; } } self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop)); self.state.num_schedule_indexing_plan += 1; + rebuild_plan_response } /// Checks if the last applied plan corresponds to the running indexing tasks present in the @@ -268,7 +290,7 @@ impl IndexingScheduler { // If there is no plan, the node is probably starting and the scheduler did not find // indexers yet. In this case, we want to schedule as soon as possible to find new // indexers. - self.rebuild_plan(model); + self.rebuild_plan(model, RebuildPlanRequest::default()); return; }; if let Some(last_applied_plan_timestamp) = self.state.last_applied_plan_timestamp { @@ -290,7 +312,7 @@ impl IndexingScheduler { ); if !indexing_plans_diff.has_same_nodes() { info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan"); - self.rebuild_plan(model); + let _ = self.rebuild_plan(model, RebuildPlanRequest::default()); } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan"); @@ -833,8 +855,13 @@ mod tests { indexer_max_loads.insert("indexer1".to_string(), mcpu(3_000)); indexer_max_loads.insert("indexer2".to_string(), mcpu(3_000)); let shard_locations = ShardLocations::default(); - let physical_plan = - build_physical_indexing_plan(&sources[..], &indexer_max_loads, None, &shard_locations); + let physical_plan = build_physical_indexing_plan( + &sources[..], + &indexer_max_loads, + None, + &shard_locations, + None, + ); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 2); let indexing_tasks_1 = physical_plan.indexer("indexer1").unwrap(); assert_eq!(indexing_tasks_1.len(), 2); @@ -865,7 +892,7 @@ mod tests { indexer_max_loads.insert(indexer_id, mcpu(4_000)); } let shard_locations = ShardLocations::default(); - let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); + let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations, None); } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index ead4509f4aa..19b9a76d5d4 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -25,6 +25,7 @@ use std::num::NonZeroU32; use fnv::{FnvHashMap, FnvHashSet}; use quickwit_common::rate_limited_debug; +use quickwit_proto::control_plane::RebuildPlanResponse; use quickwit_proto::indexing::{CpuCapacity, IndexingTask}; use quickwit_proto::types::{PipelineUid, ShardId, SourceUid}; use scheduling_logic_model::{IndexerOrd, SourceOrd}; @@ -609,6 +610,7 @@ pub fn build_physical_indexing_plan( indexer_id_to_cpu_capacities: &FnvHashMap, previous_plan_opt: Option<&PhysicalIndexingPlan>, shard_locations: &ShardLocations, + mut debug_output_opt: Option<&mut RebuildPlanResponse>, ) -> PhysicalIndexingPlan { // Asserts that the source are valid. check_sources(sources); @@ -623,13 +625,23 @@ pub fn build_physical_indexing_plan( // Populate the previous solution, if any. let mut previous_solution = problem.new_solution(); + if let Some(previous_plan) = previous_plan_opt { convert_physical_plan_to_solution(previous_plan, &id_to_ord_map, &mut previous_solution); } + if let Some(debug_output) = &mut debug_output_opt { + debug_output.previous_solution_json = serde_json::to_string(&previous_solution).unwrap(); + debug_output.problem_json = serde_json::to_string(&problem).unwrap(); + } + // Compute the new scheduling solution using a heuristic. let new_solution = scheduling_logic::solve(problem, previous_solution); + if let Some(debug_output) = debug_output_opt { + debug_output.new_solution_json = serde_json::to_string(&new_solution).unwrap(); + } + // Convert the new scheduling solution back to a physical plan. let new_physical_plan = convert_scheduling_solution_to_physical_plan( &new_solution, @@ -778,6 +790,7 @@ mod tests { &indexer_id_to_cpu_capacities, None, &shard_locations, + None, ); assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2); @@ -848,6 +861,7 @@ mod tests { &indexer_id_to_cpu_capacities, None, &shard_locations, + None, ); assert_eq!(plan.indexing_tasks_per_indexer().len(), num_indexers); let metrics = get_shard_locality_metrics(&plan, &shard_locations); @@ -876,8 +890,13 @@ mod tests { { indexer_max_loads.insert(indexer1.clone(), mcpu(1_999)); // This test what happens when there isn't enough capacity on the cluster. - let physical_plan = - build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); + let physical_plan = build_physical_indexing_plan( + &sources, + &indexer_max_loads, + None, + &shard_locations, + None, + ); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); assert_eq!(expected_tasks.len(), 2); @@ -886,8 +905,13 @@ mod tests { { indexer_max_loads.insert(indexer1.clone(), mcpu(2_000)); // This test what happens when there isn't enough capacity on the cluster. - let physical_plan = - build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); + let physical_plan = build_physical_indexing_plan( + &sources, + &indexer_max_loads, + None, + &shard_locations, + None, + ); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); assert_eq!(expected_tasks.len(), 2); @@ -955,6 +979,7 @@ mod tests { &indexer_id_to_cpu_capacities, Some(&indexing_plan), &shard_locations, + None, ); let indexing_tasks = new_plan.indexer("node1").unwrap(); assert_eq!(indexing_tasks.len(), 2); @@ -995,6 +1020,7 @@ mod tests { &indexer_id_to_cpu_capacities, Some(&indexing_plan), &shard_locations, + None, ); let mut indexing_tasks = new_plan.indexer(NODE).unwrap().to_vec(); for indexing_task in &mut indexing_tasks { @@ -1177,7 +1203,13 @@ mod tests { let mut capacities = FnvHashMap::default(); capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000)); let shard_locations = ShardLocations::default(); - build_physical_indexing_plan(&sources_to_schedule, &capacities, None, &shard_locations); + build_physical_indexing_plan( + &sources_to_schedule, + &capacities, + None, + &shard_locations, + None, + ); } #[test] diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 89d47fc50b4..0e8c64406d5 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -22,11 +22,12 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use quickwit_proto::indexing::CpuCapacity; +use serde::Serialize; pub type SourceOrd = u32; pub type IndexerOrd = usize; -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] pub struct Source { pub source_ord: SourceOrd, pub load_per_shard: NonZeroU32, @@ -79,7 +80,7 @@ impl Source { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct SchedulingProblem { sources: Vec, indexer_cpu_capacities: Vec, @@ -174,7 +175,7 @@ impl SchedulingProblem { } } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] pub struct IndexerAssignment { pub indexer_ord: IndexerOrd, pub num_shards_per_source: BTreeMap, @@ -232,7 +233,7 @@ impl IndexerAssignment { } } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] pub struct SchedulingSolution { pub indexer_assignments: Vec, } diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index 963734cd157..f9ee1dfa9bc 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -68,6 +68,9 @@ service ControlPlaneService { // Asks the control plane whether the shards listed in the request should be deleted or truncated. rpc AdviseResetShards(AdviseResetShardsRequest) returns (AdviseResetShardsResponse); + + /// Compute the indexing plan from scratch, without taking into account the current state of the indexers. + rpc RebuildPlan(RebuildPlanRequest) returns (RebuildPlanResponse); } // Shard API @@ -122,3 +125,17 @@ message AdviseResetShardsResponse { repeated quickwit.ingest.ShardIds shards_to_delete = 1; repeated quickwit.ingest.ShardIdPositions shards_to_truncate = 2; } + +/// Careful here! The Default implementation is used in different place of the code. +/// If you modify this struct make sure, its default's value aligns with the previous +/// behavior. +message RebuildPlanRequest { + bool reset = 1; + bool debug = 2; +} + +message RebuildPlanResponse { + string previous_solution_json = 1; + string problem_json = 2; + string new_solution_json = 3; +} diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 29d91cf6de0..0bc740f3c9a 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -75,6 +75,29 @@ pub struct AdviseResetShardsResponse { #[prost(message, repeated, tag = "2")] pub shards_to_truncate: ::prost::alloc::vec::Vec, } +/// / Careful here! The Default implementation is used in different place of the code. +/// / If you modify this struct make sure, its default's value aligns with the previous +/// / behavior. +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RebuildPlanRequest { + #[prost(bool, tag = "1")] + pub reset: bool, + #[prost(bool, tag = "2")] + pub debug: bool, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RebuildPlanResponse { + #[prost(string, tag = "1")] + pub previous_solution_json: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub problem_json: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub new_solution_json: ::prost::alloc::string::String, +} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "snake_case")] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] @@ -175,6 +198,11 @@ pub trait ControlPlaneService: std::fmt::Debug + Send + Sync + 'static { &self, request: AdviseResetShardsRequest, ) -> crate::control_plane::ControlPlaneResult; + #[doc = "/ Compute the indexing plan from scratch, without taking into account the current state of the indexers."] + async fn rebuild_plan( + &self, + request: RebuildPlanRequest, + ) -> crate::control_plane::ControlPlaneResult; } #[derive(Debug, Clone)] pub struct ControlPlaneServiceClient { @@ -319,6 +347,12 @@ impl ControlPlaneService for ControlPlaneServiceClient { ) -> crate::control_plane::ControlPlaneResult { self.inner.0.advise_reset_shards(request).await } + async fn rebuild_plan( + &self, + request: RebuildPlanRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.inner.0.rebuild_plan(request).await + } } #[cfg(any(test, feature = "testsuite"))] pub mod mock_control_plane_service { @@ -391,6 +425,12 @@ pub mod mock_control_plane_service { ) -> crate::control_plane::ControlPlaneResult { self.inner.lock().await.advise_reset_shards(request).await } + async fn rebuild_plan( + &self, + request: super::RebuildPlanRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.inner.lock().await.rebuild_plan(request).await + } } } pub type BoxFuture = std::pin::Pin< @@ -530,6 +570,22 @@ impl tower::Service for InnerControlPlaneServiceClient Box::pin(fut) } } +impl tower::Service for InnerControlPlaneServiceClient { + type Response = RebuildPlanResponse; + type Error = crate::control_plane::ControlPlaneError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: RebuildPlanRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.rebuild_plan(request).await }; + Box::pin(fut) + } +} /// A tower service stack is a set of tower services. #[derive(Debug)] struct ControlPlaneServiceTowerServiceStack { @@ -575,6 +631,11 @@ struct ControlPlaneServiceTowerServiceStack { AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >, + rebuild_plan_svc: quickwit_common::tower::BoxService< + RebuildPlanRequest, + RebuildPlanResponse, + crate::control_plane::ControlPlaneError, + >, } #[async_trait::async_trait] impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { @@ -630,6 +691,12 @@ impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { ) -> crate::control_plane::ControlPlaneResult { self.advise_reset_shards_svc.clone().ready().await?.call(request).await } + async fn rebuild_plan( + &self, + request: RebuildPlanRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.rebuild_plan_svc.clone().ready().await?.call(request).await + } } type CreateIndexLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< @@ -711,6 +778,16 @@ type AdviseResetShardsLayer = quickwit_common::tower::BoxLayer< AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >; +type RebuildPlanLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + RebuildPlanRequest, + RebuildPlanResponse, + crate::control_plane::ControlPlaneError, + >, + RebuildPlanRequest, + RebuildPlanResponse, + crate::control_plane::ControlPlaneError, +>; #[derive(Debug, Default)] pub struct ControlPlaneServiceTowerLayerStack { create_index_layers: Vec, @@ -721,6 +798,7 @@ pub struct ControlPlaneServiceTowerLayerStack { delete_source_layers: Vec, get_or_create_open_shards_layers: Vec, advise_reset_shards_layers: Vec, + rebuild_plan_layers: Vec, } impl ControlPlaneServiceTowerLayerStack { pub fn stack_layer(mut self, layer: L) -> Self @@ -939,6 +1017,31 @@ impl ControlPlaneServiceTowerLayerStack { crate::control_plane::ControlPlaneError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + RebuildPlanRequest, + RebuildPlanResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + RebuildPlanRequest, + Response = RebuildPlanResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); @@ -956,6 +1059,8 @@ impl ControlPlaneServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.advise_reset_shards_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.rebuild_plan_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } pub fn stack_create_index_layer(mut self, layer: L) -> Self @@ -1126,6 +1231,25 @@ impl ControlPlaneServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_rebuild_plan_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + RebuildPlanRequest, + RebuildPlanResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + RebuildPlanRequest, + Response = RebuildPlanResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.rebuild_plan_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn build(self, instance: T) -> ControlPlaneServiceClient where T: ControlPlaneService, @@ -1249,6 +1373,14 @@ impl ControlPlaneServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); + let rebuild_plan_svc = self + .rebuild_plan_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let tower_svc_stack = ControlPlaneServiceTowerServiceStack { inner: inner_client, create_index_svc, @@ -1259,6 +1391,7 @@ impl ControlPlaneServiceTowerLayerStack { delete_source_svc, get_or_create_open_shards_svc, advise_reset_shards_svc, + rebuild_plan_svc, }; ControlPlaneServiceClient::new(tower_svc_stack) } @@ -1406,6 +1539,15 @@ where AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >, + > + + tower::Service< + RebuildPlanRequest, + Response = RebuildPlanResponse, + Error = crate::control_plane::ControlPlaneError, + Future = BoxFuture< + RebuildPlanResponse, + crate::control_plane::ControlPlaneError, + >, >, { async fn create_index( @@ -1460,6 +1602,12 @@ where ) -> crate::control_plane::ControlPlaneResult { self.clone().call(request).await } + async fn rebuild_plan( + &self, + request: RebuildPlanRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.clone().call(request).await + } } #[derive(Debug, Clone)] pub struct ControlPlaneServiceGrpcClientAdapter { @@ -1611,6 +1759,20 @@ where AdviseResetShardsRequest::rpc_name(), )) } + async fn rebuild_plan( + &self, + request: RebuildPlanRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.inner + .clone() + .rebuild_plan(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + RebuildPlanRequest::rpc_name(), + )) + } } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServerAdapter { @@ -1720,6 +1882,17 @@ for ControlPlaneServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn rebuild_plan( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .rebuild_plan(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } } /// Generated client implementations. pub mod control_plane_service_grpc_client { @@ -2061,6 +2234,37 @@ pub mod control_plane_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// / Compute the indexing plan from scratch, without taking into account the current state of the indexers. + pub async fn rebuild_plan( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.control_plane.ControlPlaneService/RebuildPlan", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.control_plane.ControlPlaneService", + "RebuildPlan", + ), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -2135,6 +2339,14 @@ pub mod control_plane_service_grpc_server { tonic::Response, tonic::Status, >; + /// / Compute the indexing plan from scratch, without taking into account the current state of the indexers. + async fn rebuild_plan( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServer { @@ -2602,6 +2814,52 @@ pub mod control_plane_service_grpc_server { }; Box::pin(fut) } + "/quickwit.control_plane.ControlPlaneService/RebuildPlan" => { + #[allow(non_camel_case_types)] + struct RebuildPlanSvc(pub Arc); + impl< + T: ControlPlaneServiceGrpc, + > tonic::server::UnaryService + for RebuildPlanSvc { + type Response = super::RebuildPlanResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).rebuild_plan(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = RebuildPlanSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index 85caadfdb28..a9e98b0294d 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -126,6 +126,12 @@ impl RpcName for AdviseResetShardsRequest { } } +impl RpcName for RebuildPlanRequest { + fn rpc_name() -> &'static str { + "rebuild_plan_request" + } +} + impl GetOrCreateOpenShardsFailureReason { pub fn create_failure( &self, diff --git a/quickwit/quickwit-serve/src/developer_api/mod.rs b/quickwit/quickwit-serve/src/developer_api/mod.rs index 0a4f8a29442..63bb8b6583a 100644 --- a/quickwit/quickwit-serve/src/developer_api/mod.rs +++ b/quickwit/quickwit-serve/src/developer_api/mod.rs @@ -20,12 +20,15 @@ mod debug; mod log_level; mod pprof; +mod rebuild_plan; mod server; use debug::debug_handler; use log_level::log_level_handler; use pprof::pprof_handlers; use quickwit_cluster::Cluster; +use quickwit_proto::control_plane::ControlPlaneServiceClient; +use rebuild_plan::rebuild_plan_handler; pub(crate) use server::DeveloperApiServer; use warp::{Filter, Rejection}; @@ -37,6 +40,7 @@ use crate::EnvFilterReloadFn; pub struct DeveloperApi; pub(crate) fn developer_api_routes( + control_plane_client: ControlPlaneServiceClient, cluster: Cluster, env_filter_reload_fn: EnvFilterReloadFn, ) -> impl Filter + Clone { @@ -44,7 +48,8 @@ pub(crate) fn developer_api_routes( .and( debug_handler(cluster.clone()) .or(log_level_handler(env_filter_reload_fn.clone())) - .or(pprof_handlers()), + .or(pprof_handlers()) + .or(rebuild_plan_handler(control_plane_client)), ) .recover(recover_fn) } diff --git a/quickwit/quickwit-serve/src/developer_api/rebuild_plan.rs b/quickwit/quickwit-serve/src/developer_api/rebuild_plan.rs new file mode 100644 index 00000000000..424101d9630 --- /dev/null +++ b/quickwit/quickwit-serve/src/developer_api/rebuild_plan.rs @@ -0,0 +1,93 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use hyper::StatusCode; +use quickwit_proto::control_plane::{ + ControlPlaneResult, ControlPlaneService, ControlPlaneServiceClient, RebuildPlanRequest, + RebuildPlanResponse, +}; +use serde::{Deserialize, Serialize}; +use tracing::error; +use warp::Filter; + +use crate::with_arg; + +#[derive(Deserialize)] +struct RebuildPlanParams { + #[serde(default)] + reset: bool, +} + +#[derive(Serialize)] +struct RebuildPlanResp { + previous_solution: serde_json::Value, + problem: serde_json::Value, + new_solution: serde_json::Value, +} + +pub fn rebuild_plan_handler( + control_plane_client: ControlPlaneServiceClient, +) -> impl Filter + Clone { + warp::path("rebuild-plan") + .and(warp::get()) + .and(with_arg(control_plane_client)) + .and(warp::query::()) + .then( + |control_plane_client: ControlPlaneServiceClient, + rebuild_plan_params: RebuildPlanParams| async move { + let rebuild_plan_request = RebuildPlanRequest { + reset: rebuild_plan_params.reset, + debug: true, + }; + let rebuild_plan_result: ControlPlaneResult = + control_plane_client + .rebuild_plan(rebuild_plan_request) + .await; + match rebuild_plan_result { + Ok(RebuildPlanResponse { + previous_solution_json, + problem_json, + new_solution_json, + }) => { + let rebuild_plan_resp = RebuildPlanResp { + previous_solution: serde_json::from_str(&previous_solution_json) + .unwrap_or(serde_json::Value::Null), + problem: serde_json::from_str(&problem_json) + .unwrap_or(serde_json::Value::Null), + new_solution: serde_json::from_str(&new_solution_json) + .unwrap_or(serde_json::Value::Null), + }; + warp::reply::with_status( + warp::reply::json(&rebuild_plan_resp), + StatusCode::OK, + ) + } + Err(err) => { + error!("control plane error on rebuild plane: {:?}", err); + warp::reply::with_status( + warp::reply::json( + &serde_json::json!({"error_msg": "error rebuilding plan"}), + ), + StatusCode::INTERNAL_SERVER_ERROR, + ) + } + } + }, + ) +} diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index e2818b35928..e874f21571a 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -159,6 +159,7 @@ pub(crate) async fn start_rest_server( // `/api/developer/*` route. let developer_routes = developer_api_routes( + quickwit_services.control_plane_client.clone(), quickwit_services.cluster.clone(), quickwit_services.env_filter_reload_fn.clone(), );