diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 9ff1a246f6b..ea16e4ef796 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -43,7 +43,6 @@ use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMet use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest, - RebuildPlanRequest, RebuildPlanResponse, }; use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::metastore::{ @@ -78,6 +77,9 @@ const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2); #[derive(Debug)] struct ControlPlanLoop; +#[derive(Debug, Default)] +struct RebuildPlan; + pub struct ControlPlane { cluster_config: ClusterConfig, cluster_change_stream_opt: Option, @@ -382,24 +384,22 @@ impl ControlPlane { .next_rebuild_tracker .next_rebuild_waiter(); self.rebuild_plan_debouncer - .self_send_with_cooldown::(ctx); + .self_send_with_cooldown::(ctx); next_rebuild_waiter } } #[async_trait] -impl Handler for ControlPlane { - type Reply = ControlPlaneResult; +impl Handler for ControlPlane { + type Reply = (); async fn handle( &mut self, - rebuild_plan_request: RebuildPlanRequest, + _message: RebuildPlan, _ctx: &ActorContext, - ) -> Result, ActorExitStatus> { - let rebuild_plan_response = self - .indexing_scheduler - .rebuild_plan(&self.model, rebuild_plan_request); - Ok(Ok(rebuild_plan_response)) + ) -> Result<(), ActorExitStatus> { + self.indexing_scheduler.rebuild_plan(&self.model); + Ok(()) } } @@ -925,8 +925,7 @@ impl Handler for ControlPlane { { return convert_metastore_error::<()>(metastore_error).map(|_| ()); } - self.indexing_scheduler - .rebuild_plan(&self.model, RebuildPlanRequest::default()); + self.indexing_scheduler.rebuild_plan(&self.model); Ok(()) } } @@ -956,8 +955,7 @@ impl Handler for ControlPlane { { return convert_metastore_error::<()>(metastore_error).map(|_| ()); } - self.indexing_scheduler - .rebuild_plan(&self.model, RebuildPlanRequest::default()); + self.indexing_scheduler.rebuild_plan(&self.model); Ok(()) } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 06476b692c9..0fc16189925 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -28,7 +28,6 @@ use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; -use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse}; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, }; @@ -206,12 +205,7 @@ impl IndexingScheduler { // // Prefer not calling this method directly, and instead call // `ControlPlane::rebuild_indexing_plan_debounced`. - pub(crate) fn rebuild_plan( - &mut self, - model: &ControlPlaneModel, - rebuild_plan_request: RebuildPlanRequest, - ) -> RebuildPlanResponse { - let RebuildPlanRequest { reset, debug } = rebuild_plan_request; + pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) { crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc(); let notify_on_drop = self.next_rebuild_tracker.start_rebuild(); @@ -220,8 +214,6 @@ impl IndexingScheduler { let indexers: Vec = self.get_indexers_from_indexer_pool(); - let mut rebuild_plan_response = RebuildPlanResponse::default(); - let indexer_id_to_cpu_capacities: FnvHashMap = indexers .iter() .filter_map(|indexer| { @@ -237,28 +229,15 @@ impl IndexingScheduler { if !sources.is_empty() { warn!("no indexing capacity available, cannot schedule an indexing plan"); } - return rebuild_plan_response; + return; }; let shard_locations = model.shard_locations(); - let previous_applied_plan = if reset { - None - } else { - self.state.last_applied_physical_plan.as_ref() - }; - - let debug_output: Option<&mut RebuildPlanResponse> = if debug { - Some(&mut rebuild_plan_response) - } else { - None - }; - let new_physical_plan = build_physical_indexing_plan( &sources, &indexer_id_to_cpu_capacities, - previous_applied_plan, + self.state.last_applied_physical_plan.as_ref(), &shard_locations, - debug_output, ); let shard_locality_metrics = get_shard_locality_metrics(&new_physical_plan, &shard_locations); @@ -270,12 +249,11 @@ impl IndexingScheduler { ); // No need to apply the new plan as it is the same as the old one. if plans_diff.is_empty() { - return rebuild_plan_response; + return; } } self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop)); self.state.num_schedule_indexing_plan += 1; - rebuild_plan_response } /// Checks if the last applied plan corresponds to the running indexing tasks present in the @@ -290,7 +268,7 @@ impl IndexingScheduler { // If there is no plan, the node is probably starting and the scheduler did not find // indexers yet. In this case, we want to schedule as soon as possible to find new // indexers. - self.rebuild_plan(model, RebuildPlanRequest::default()); + self.rebuild_plan(model); return; }; if let Some(last_applied_plan_timestamp) = self.state.last_applied_plan_timestamp { @@ -312,7 +290,7 @@ impl IndexingScheduler { ); if !indexing_plans_diff.has_same_nodes() { info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan"); - let _ = self.rebuild_plan(model, RebuildPlanRequest::default()); + self.rebuild_plan(model); } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan"); @@ -855,13 +833,8 @@ mod tests { indexer_max_loads.insert("indexer1".to_string(), mcpu(3_000)); indexer_max_loads.insert("indexer2".to_string(), mcpu(3_000)); let shard_locations = ShardLocations::default(); - let physical_plan = build_physical_indexing_plan( - &sources[..], - &indexer_max_loads, - None, - &shard_locations, - None, - ); + let physical_plan = + build_physical_indexing_plan(&sources[..], &indexer_max_loads, None, &shard_locations); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 2); let indexing_tasks_1 = physical_plan.indexer("indexer1").unwrap(); assert_eq!(indexing_tasks_1.len(), 2); @@ -892,7 +865,7 @@ mod tests { indexer_max_loads.insert(indexer_id, mcpu(4_000)); } let shard_locations = ShardLocations::default(); - let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations, None); + let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 19b9a76d5d4..ead4509f4aa 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -25,7 +25,6 @@ use std::num::NonZeroU32; use fnv::{FnvHashMap, FnvHashSet}; use quickwit_common::rate_limited_debug; -use quickwit_proto::control_plane::RebuildPlanResponse; use quickwit_proto::indexing::{CpuCapacity, IndexingTask}; use quickwit_proto::types::{PipelineUid, ShardId, SourceUid}; use scheduling_logic_model::{IndexerOrd, SourceOrd}; @@ -610,7 +609,6 @@ pub fn build_physical_indexing_plan( indexer_id_to_cpu_capacities: &FnvHashMap, previous_plan_opt: Option<&PhysicalIndexingPlan>, shard_locations: &ShardLocations, - mut debug_output_opt: Option<&mut RebuildPlanResponse>, ) -> PhysicalIndexingPlan { // Asserts that the source are valid. check_sources(sources); @@ -625,23 +623,13 @@ pub fn build_physical_indexing_plan( // Populate the previous solution, if any. let mut previous_solution = problem.new_solution(); - if let Some(previous_plan) = previous_plan_opt { convert_physical_plan_to_solution(previous_plan, &id_to_ord_map, &mut previous_solution); } - if let Some(debug_output) = &mut debug_output_opt { - debug_output.previous_solution_json = serde_json::to_string(&previous_solution).unwrap(); - debug_output.problem_json = serde_json::to_string(&problem).unwrap(); - } - // Compute the new scheduling solution using a heuristic. let new_solution = scheduling_logic::solve(problem, previous_solution); - if let Some(debug_output) = debug_output_opt { - debug_output.new_solution_json = serde_json::to_string(&new_solution).unwrap(); - } - // Convert the new scheduling solution back to a physical plan. let new_physical_plan = convert_scheduling_solution_to_physical_plan( &new_solution, @@ -790,7 +778,6 @@ mod tests { &indexer_id_to_cpu_capacities, None, &shard_locations, - None, ); assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2); @@ -861,7 +848,6 @@ mod tests { &indexer_id_to_cpu_capacities, None, &shard_locations, - None, ); assert_eq!(plan.indexing_tasks_per_indexer().len(), num_indexers); let metrics = get_shard_locality_metrics(&plan, &shard_locations); @@ -890,13 +876,8 @@ mod tests { { indexer_max_loads.insert(indexer1.clone(), mcpu(1_999)); // This test what happens when there isn't enough capacity on the cluster. - let physical_plan = build_physical_indexing_plan( - &sources, - &indexer_max_loads, - None, - &shard_locations, - None, - ); + let physical_plan = + build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); assert_eq!(expected_tasks.len(), 2); @@ -905,13 +886,8 @@ mod tests { { indexer_max_loads.insert(indexer1.clone(), mcpu(2_000)); // This test what happens when there isn't enough capacity on the cluster. - let physical_plan = build_physical_indexing_plan( - &sources, - &indexer_max_loads, - None, - &shard_locations, - None, - ); + let physical_plan = + build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); assert_eq!(expected_tasks.len(), 2); @@ -979,7 +955,6 @@ mod tests { &indexer_id_to_cpu_capacities, Some(&indexing_plan), &shard_locations, - None, ); let indexing_tasks = new_plan.indexer("node1").unwrap(); assert_eq!(indexing_tasks.len(), 2); @@ -1020,7 +995,6 @@ mod tests { &indexer_id_to_cpu_capacities, Some(&indexing_plan), &shard_locations, - None, ); let mut indexing_tasks = new_plan.indexer(NODE).unwrap().to_vec(); for indexing_task in &mut indexing_tasks { @@ -1203,13 +1177,7 @@ mod tests { let mut capacities = FnvHashMap::default(); capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000)); let shard_locations = ShardLocations::default(); - build_physical_indexing_plan( - &sources_to_schedule, - &capacities, - None, - &shard_locations, - None, - ); + build_physical_indexing_plan(&sources_to_schedule, &capacities, None, &shard_locations); } #[test] diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 0e8c64406d5..89d47fc50b4 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -22,12 +22,11 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use quickwit_proto::indexing::CpuCapacity; -use serde::Serialize; pub type SourceOrd = u32; pub type IndexerOrd = usize; -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Source { pub source_ord: SourceOrd, pub load_per_shard: NonZeroU32, @@ -80,7 +79,7 @@ impl Source { } } -#[derive(Debug, Serialize)] +#[derive(Debug)] pub struct SchedulingProblem { sources: Vec, indexer_cpu_capacities: Vec, @@ -175,7 +174,7 @@ impl SchedulingProblem { } } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct IndexerAssignment { pub indexer_ord: IndexerOrd, pub num_shards_per_source: BTreeMap, @@ -233,7 +232,7 @@ impl IndexerAssignment { } } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct SchedulingSolution { pub indexer_assignments: Vec, } diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index f9ee1dfa9bc..963734cd157 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -68,9 +68,6 @@ service ControlPlaneService { // Asks the control plane whether the shards listed in the request should be deleted or truncated. rpc AdviseResetShards(AdviseResetShardsRequest) returns (AdviseResetShardsResponse); - - /// Compute the indexing plan from scratch, without taking into account the current state of the indexers. - rpc RebuildPlan(RebuildPlanRequest) returns (RebuildPlanResponse); } // Shard API @@ -125,17 +122,3 @@ message AdviseResetShardsResponse { repeated quickwit.ingest.ShardIds shards_to_delete = 1; repeated quickwit.ingest.ShardIdPositions shards_to_truncate = 2; } - -/// Careful here! The Default implementation is used in different place of the code. -/// If you modify this struct make sure, its default's value aligns with the previous -/// behavior. -message RebuildPlanRequest { - bool reset = 1; - bool debug = 2; -} - -message RebuildPlanResponse { - string previous_solution_json = 1; - string problem_json = 2; - string new_solution_json = 3; -} diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 0bc740f3c9a..29d91cf6de0 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -75,29 +75,6 @@ pub struct AdviseResetShardsResponse { #[prost(message, repeated, tag = "2")] pub shards_to_truncate: ::prost::alloc::vec::Vec, } -/// / Careful here! The Default implementation is used in different place of the code. -/// / If you modify this struct make sure, its default's value aligns with the previous -/// / behavior. -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct RebuildPlanRequest { - #[prost(bool, tag = "1")] - pub reset: bool, - #[prost(bool, tag = "2")] - pub debug: bool, -} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct RebuildPlanResponse { - #[prost(string, tag = "1")] - pub previous_solution_json: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub problem_json: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub new_solution_json: ::prost::alloc::string::String, -} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "snake_case")] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] @@ -198,11 +175,6 @@ pub trait ControlPlaneService: std::fmt::Debug + Send + Sync + 'static { &self, request: AdviseResetShardsRequest, ) -> crate::control_plane::ControlPlaneResult; - #[doc = "/ Compute the indexing plan from scratch, without taking into account the current state of the indexers."] - async fn rebuild_plan( - &self, - request: RebuildPlanRequest, - ) -> crate::control_plane::ControlPlaneResult; } #[derive(Debug, Clone)] pub struct ControlPlaneServiceClient { @@ -347,12 +319,6 @@ impl ControlPlaneService for ControlPlaneServiceClient { ) -> crate::control_plane::ControlPlaneResult { self.inner.0.advise_reset_shards(request).await } - async fn rebuild_plan( - &self, - request: RebuildPlanRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner.0.rebuild_plan(request).await - } } #[cfg(any(test, feature = "testsuite"))] pub mod mock_control_plane_service { @@ -425,12 +391,6 @@ pub mod mock_control_plane_service { ) -> crate::control_plane::ControlPlaneResult { self.inner.lock().await.advise_reset_shards(request).await } - async fn rebuild_plan( - &self, - request: super::RebuildPlanRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner.lock().await.rebuild_plan(request).await - } } } pub type BoxFuture = std::pin::Pin< @@ -570,22 +530,6 @@ impl tower::Service for InnerControlPlaneServiceClient Box::pin(fut) } } -impl tower::Service for InnerControlPlaneServiceClient { - type Response = RebuildPlanResponse; - type Error = crate::control_plane::ControlPlaneError; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } - fn call(&mut self, request: RebuildPlanRequest) -> Self::Future { - let svc = self.clone(); - let fut = async move { svc.0.rebuild_plan(request).await }; - Box::pin(fut) - } -} /// A tower service stack is a set of tower services. #[derive(Debug)] struct ControlPlaneServiceTowerServiceStack { @@ -631,11 +575,6 @@ struct ControlPlaneServiceTowerServiceStack { AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >, - rebuild_plan_svc: quickwit_common::tower::BoxService< - RebuildPlanRequest, - RebuildPlanResponse, - crate::control_plane::ControlPlaneError, - >, } #[async_trait::async_trait] impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { @@ -691,12 +630,6 @@ impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { ) -> crate::control_plane::ControlPlaneResult { self.advise_reset_shards_svc.clone().ready().await?.call(request).await } - async fn rebuild_plan( - &self, - request: RebuildPlanRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.rebuild_plan_svc.clone().ready().await?.call(request).await - } } type CreateIndexLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< @@ -778,16 +711,6 @@ type AdviseResetShardsLayer = quickwit_common::tower::BoxLayer< AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >; -type RebuildPlanLayer = quickwit_common::tower::BoxLayer< - quickwit_common::tower::BoxService< - RebuildPlanRequest, - RebuildPlanResponse, - crate::control_plane::ControlPlaneError, - >, - RebuildPlanRequest, - RebuildPlanResponse, - crate::control_plane::ControlPlaneError, ->; #[derive(Debug, Default)] pub struct ControlPlaneServiceTowerLayerStack { create_index_layers: Vec, @@ -798,7 +721,6 @@ pub struct ControlPlaneServiceTowerLayerStack { delete_source_layers: Vec, get_or_create_open_shards_layers: Vec, advise_reset_shards_layers: Vec, - rebuild_plan_layers: Vec, } impl ControlPlaneServiceTowerLayerStack { pub fn stack_layer(mut self, layer: L) -> Self @@ -1017,31 +939,6 @@ impl ControlPlaneServiceTowerLayerStack { crate::control_plane::ControlPlaneError, >, >>::Service as tower::Service>::Future: Send + 'static, - L: tower::Layer< - quickwit_common::tower::BoxService< - RebuildPlanRequest, - RebuildPlanResponse, - crate::control_plane::ControlPlaneError, - >, - > + Clone + Send + Sync + 'static, - , - >>::Service: tower::Service< - RebuildPlanRequest, - Response = RebuildPlanResponse, - Error = crate::control_plane::ControlPlaneError, - > + Clone + Send + Sync + 'static, - <, - >>::Service as tower::Service>::Future: Send + 'static, { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); @@ -1059,8 +956,6 @@ impl ControlPlaneServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.advise_reset_shards_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.rebuild_plan_layers - .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } pub fn stack_create_index_layer(mut self, layer: L) -> Self @@ -1231,25 +1126,6 @@ impl ControlPlaneServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn stack_rebuild_plan_layer(mut self, layer: L) -> Self - where - L: tower::Layer< - quickwit_common::tower::BoxService< - RebuildPlanRequest, - RebuildPlanResponse, - crate::control_plane::ControlPlaneError, - >, - > + Send + Sync + 'static, - L::Service: tower::Service< - RebuildPlanRequest, - Response = RebuildPlanResponse, - Error = crate::control_plane::ControlPlaneError, - > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - { - self.rebuild_plan_layers.push(quickwit_common::tower::BoxLayer::new(layer)); - self - } pub fn build(self, instance: T) -> ControlPlaneServiceClient where T: ControlPlaneService, @@ -1373,14 +1249,6 @@ impl ControlPlaneServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); - let rebuild_plan_svc = self - .rebuild_plan_layers - .into_iter() - .rev() - .fold( - quickwit_common::tower::BoxService::new(inner_client.clone()), - |svc, layer| layer.layer(svc), - ); let tower_svc_stack = ControlPlaneServiceTowerServiceStack { inner: inner_client, create_index_svc, @@ -1391,7 +1259,6 @@ impl ControlPlaneServiceTowerLayerStack { delete_source_svc, get_or_create_open_shards_svc, advise_reset_shards_svc, - rebuild_plan_svc, }; ControlPlaneServiceClient::new(tower_svc_stack) } @@ -1539,15 +1406,6 @@ where AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >, - > - + tower::Service< - RebuildPlanRequest, - Response = RebuildPlanResponse, - Error = crate::control_plane::ControlPlaneError, - Future = BoxFuture< - RebuildPlanResponse, - crate::control_plane::ControlPlaneError, - >, >, { async fn create_index( @@ -1602,12 +1460,6 @@ where ) -> crate::control_plane::ControlPlaneResult { self.clone().call(request).await } - async fn rebuild_plan( - &self, - request: RebuildPlanRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.clone().call(request).await - } } #[derive(Debug, Clone)] pub struct ControlPlaneServiceGrpcClientAdapter { @@ -1759,20 +1611,6 @@ where AdviseResetShardsRequest::rpc_name(), )) } - async fn rebuild_plan( - &self, - request: RebuildPlanRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner - .clone() - .rebuild_plan(request) - .await - .map(|response| response.into_inner()) - .map_err(|status| crate::error::grpc_status_to_service_error( - status, - RebuildPlanRequest::rpc_name(), - )) - } } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServerAdapter { @@ -1882,17 +1720,6 @@ for ControlPlaneServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } - async fn rebuild_plan( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - self.inner - .0 - .rebuild_plan(request.into_inner()) - .await - .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) - } } /// Generated client implementations. pub mod control_plane_service_grpc_client { @@ -2234,37 +2061,6 @@ pub mod control_plane_service_grpc_client { ); self.inner.unary(req, path, codec).await } - /// / Compute the indexing plan from scratch, without taking into account the current state of the indexers. - pub async fn rebuild_plan( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.control_plane.ControlPlaneService/RebuildPlan", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "quickwit.control_plane.ControlPlaneService", - "RebuildPlan", - ), - ); - self.inner.unary(req, path, codec).await - } } } /// Generated server implementations. @@ -2339,14 +2135,6 @@ pub mod control_plane_service_grpc_server { tonic::Response, tonic::Status, >; - /// / Compute the indexing plan from scratch, without taking into account the current state of the indexers. - async fn rebuild_plan( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServer { @@ -2814,52 +2602,6 @@ pub mod control_plane_service_grpc_server { }; Box::pin(fut) } - "/quickwit.control_plane.ControlPlaneService/RebuildPlan" => { - #[allow(non_camel_case_types)] - struct RebuildPlanSvc(pub Arc); - impl< - T: ControlPlaneServiceGrpc, - > tonic::server::UnaryService - for RebuildPlanSvc { - type Response = super::RebuildPlanResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - (*inner).rebuild_plan(request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = RebuildPlanSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index a9e98b0294d..85caadfdb28 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -126,12 +126,6 @@ impl RpcName for AdviseResetShardsRequest { } } -impl RpcName for RebuildPlanRequest { - fn rpc_name() -> &'static str { - "rebuild_plan_request" - } -} - impl GetOrCreateOpenShardsFailureReason { pub fn create_failure( &self, diff --git a/quickwit/quickwit-serve/src/developer_api/mod.rs b/quickwit/quickwit-serve/src/developer_api/mod.rs index fd4a3299520..2438e93146e 100644 --- a/quickwit/quickwit-serve/src/developer_api/mod.rs +++ b/quickwit/quickwit-serve/src/developer_api/mod.rs @@ -23,15 +23,12 @@ mod log_level; #[cfg_attr(not(feature = "pprof"), path = "pprof_disabled.rs")] mod pprof; -mod rebuild_plan; mod server; use debug::debug_handler; use log_level::log_level_handler; use pprof::pprof_handlers; use quickwit_cluster::Cluster; -use quickwit_proto::control_plane::ControlPlaneServiceClient; -use rebuild_plan::rebuild_plan_handler; pub(crate) use server::DeveloperApiServer; use warp::{Filter, Rejection}; @@ -43,7 +40,6 @@ use crate::EnvFilterReloadFn; pub struct DeveloperApi; pub(crate) fn developer_api_routes( - control_plane_client: ControlPlaneServiceClient, cluster: Cluster, env_filter_reload_fn: EnvFilterReloadFn, ) -> impl Filter + Clone { @@ -51,8 +47,7 @@ pub(crate) fn developer_api_routes( .and( debug_handler(cluster.clone()) .or(log_level_handler(env_filter_reload_fn.clone())) - .or(pprof_handlers()) - .or(rebuild_plan_handler(control_plane_client)), + .or(pprof_handlers()), ) .recover(recover_fn) } diff --git a/quickwit/quickwit-serve/src/developer_api/rebuild_plan.rs b/quickwit/quickwit-serve/src/developer_api/rebuild_plan.rs deleted file mode 100644 index 424101d9630..00000000000 --- a/quickwit/quickwit-serve/src/developer_api/rebuild_plan.rs +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use hyper::StatusCode; -use quickwit_proto::control_plane::{ - ControlPlaneResult, ControlPlaneService, ControlPlaneServiceClient, RebuildPlanRequest, - RebuildPlanResponse, -}; -use serde::{Deserialize, Serialize}; -use tracing::error; -use warp::Filter; - -use crate::with_arg; - -#[derive(Deserialize)] -struct RebuildPlanParams { - #[serde(default)] - reset: bool, -} - -#[derive(Serialize)] -struct RebuildPlanResp { - previous_solution: serde_json::Value, - problem: serde_json::Value, - new_solution: serde_json::Value, -} - -pub fn rebuild_plan_handler( - control_plane_client: ControlPlaneServiceClient, -) -> impl Filter + Clone { - warp::path("rebuild-plan") - .and(warp::get()) - .and(with_arg(control_plane_client)) - .and(warp::query::()) - .then( - |control_plane_client: ControlPlaneServiceClient, - rebuild_plan_params: RebuildPlanParams| async move { - let rebuild_plan_request = RebuildPlanRequest { - reset: rebuild_plan_params.reset, - debug: true, - }; - let rebuild_plan_result: ControlPlaneResult = - control_plane_client - .rebuild_plan(rebuild_plan_request) - .await; - match rebuild_plan_result { - Ok(RebuildPlanResponse { - previous_solution_json, - problem_json, - new_solution_json, - }) => { - let rebuild_plan_resp = RebuildPlanResp { - previous_solution: serde_json::from_str(&previous_solution_json) - .unwrap_or(serde_json::Value::Null), - problem: serde_json::from_str(&problem_json) - .unwrap_or(serde_json::Value::Null), - new_solution: serde_json::from_str(&new_solution_json) - .unwrap_or(serde_json::Value::Null), - }; - warp::reply::with_status( - warp::reply::json(&rebuild_plan_resp), - StatusCode::OK, - ) - } - Err(err) => { - error!("control plane error on rebuild plane: {:?}", err); - warp::reply::with_status( - warp::reply::json( - &serde_json::json!({"error_msg": "error rebuilding plan"}), - ), - StatusCode::INTERNAL_SERVER_ERROR, - ) - } - } - }, - ) -} diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index e874f21571a..e2818b35928 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -159,7 +159,6 @@ pub(crate) async fn start_rest_server( // `/api/developer/*` route. let developer_routes = developer_api_routes( - quickwit_services.control_plane_client.clone(), quickwit_services.cluster.clone(), quickwit_services.env_filter_reload_fn.clone(), );