Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Added rebuild plan rest debug handler. (#5150)" #5192

Merged
merged 1 commit into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMet
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest,
RebuildPlanRequest, RebuildPlanResponse,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -78,6 +77,9 @@ const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);
#[derive(Debug)]
struct ControlPlanLoop;

#[derive(Debug, Default)]
struct RebuildPlan;

pub struct ControlPlane {
cluster_config: ClusterConfig,
cluster_change_stream_opt: Option<ClusterChangeStream>,
Expand Down Expand Up @@ -382,24 +384,22 @@ impl ControlPlane {
.next_rebuild_tracker
.next_rebuild_waiter();
self.rebuild_plan_debouncer
.self_send_with_cooldown::<RebuildPlanRequest>(ctx);
.self_send_with_cooldown::<RebuildPlan>(ctx);
next_rebuild_waiter
}
}

#[async_trait]
impl Handler<RebuildPlanRequest> for ControlPlane {
type Reply = ControlPlaneResult<RebuildPlanResponse>;
impl Handler<RebuildPlan> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
rebuild_plan_request: RebuildPlanRequest,
_message: RebuildPlan,
_ctx: &ActorContext<Self>,
) -> Result<ControlPlaneResult<RebuildPlanResponse>, ActorExitStatus> {
let rebuild_plan_response = self
.indexing_scheduler
.rebuild_plan(&self.model, rebuild_plan_request);
Ok(Ok(rebuild_plan_response))
) -> Result<(), ActorExitStatus> {
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
}

Expand Down Expand Up @@ -925,8 +925,7 @@ impl Handler<IndexerJoined> for ControlPlane {
{
return convert_metastore_error::<()>(metastore_error).map(|_| ());
}
self.indexing_scheduler
.rebuild_plan(&self.model, RebuildPlanRequest::default());
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
}
Expand Down Expand Up @@ -956,8 +955,7 @@ impl Handler<IndexerLeft> for ControlPlane {
{
return convert_metastore_error::<()>(metastore_error).map(|_| ());
}
self.indexing_scheduler
.rebuild_plan(&self.model, RebuildPlanRequest::default());
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
}
Expand Down
45 changes: 9 additions & 36 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
};
Expand Down Expand Up @@ -206,12 +205,7 @@ impl IndexingScheduler {
//
// Prefer not calling this method directly, and instead call
// `ControlPlane::rebuild_indexing_plan_debounced`.
pub(crate) fn rebuild_plan(
&mut self,
model: &ControlPlaneModel,
rebuild_plan_request: RebuildPlanRequest,
) -> RebuildPlanResponse {
let RebuildPlanRequest { reset, debug } = rebuild_plan_request;
pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();

let notify_on_drop = self.next_rebuild_tracker.start_rebuild();
Expand All @@ -220,8 +214,6 @@ impl IndexingScheduler {

let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();

let mut rebuild_plan_response = RebuildPlanResponse::default();

let indexer_id_to_cpu_capacities: FnvHashMap<String, CpuCapacity> = indexers
.iter()
.filter_map(|indexer| {
Expand All @@ -237,28 +229,15 @@ impl IndexingScheduler {
if !sources.is_empty() {
warn!("no indexing capacity available, cannot schedule an indexing plan");
}
return rebuild_plan_response;
return;
};

let shard_locations = model.shard_locations();
let previous_applied_plan = if reset {
None
} else {
self.state.last_applied_physical_plan.as_ref()
};

let debug_output: Option<&mut RebuildPlanResponse> = if debug {
Some(&mut rebuild_plan_response)
} else {
None
};

let new_physical_plan = build_physical_indexing_plan(
&sources,
&indexer_id_to_cpu_capacities,
previous_applied_plan,
self.state.last_applied_physical_plan.as_ref(),
&shard_locations,
debug_output,
);
let shard_locality_metrics =
get_shard_locality_metrics(&new_physical_plan, &shard_locations);
Expand All @@ -270,12 +249,11 @@ impl IndexingScheduler {
);
// No need to apply the new plan as it is the same as the old one.
if plans_diff.is_empty() {
return rebuild_plan_response;
return;
}
}
self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop));
self.state.num_schedule_indexing_plan += 1;
rebuild_plan_response
}

/// Checks if the last applied plan corresponds to the running indexing tasks present in the
Expand All @@ -290,7 +268,7 @@ impl IndexingScheduler {
// If there is no plan, the node is probably starting and the scheduler did not find
// indexers yet. In this case, we want to schedule as soon as possible to find new
// indexers.
self.rebuild_plan(model, RebuildPlanRequest::default());
self.rebuild_plan(model);
return;
};
if let Some(last_applied_plan_timestamp) = self.state.last_applied_plan_timestamp {
Expand All @@ -312,7 +290,7 @@ impl IndexingScheduler {
);
if !indexing_plans_diff.has_same_nodes() {
info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan");
let _ = self.rebuild_plan(model, RebuildPlanRequest::default());
self.rebuild_plan(model);
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
Expand Down Expand Up @@ -855,13 +833,8 @@ mod tests {
indexer_max_loads.insert("indexer1".to_string(), mcpu(3_000));
indexer_max_loads.insert("indexer2".to_string(), mcpu(3_000));
let shard_locations = ShardLocations::default();
let physical_plan = build_physical_indexing_plan(
&sources[..],
&indexer_max_loads,
None,
&shard_locations,
None,
);
let physical_plan =
build_physical_indexing_plan(&sources[..], &indexer_max_loads, None, &shard_locations);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 2);
let indexing_tasks_1 = physical_plan.indexer("indexer1").unwrap();
assert_eq!(indexing_tasks_1.len(), 2);
Expand Down Expand Up @@ -892,7 +865,7 @@ mod tests {
indexer_max_loads.insert(indexer_id, mcpu(4_000));
}
let shard_locations = ShardLocations::default();
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations, None);
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::num::NonZeroU32;

use fnv::{FnvHashMap, FnvHashSet};
use quickwit_common::rate_limited_debug;
use quickwit_proto::control_plane::RebuildPlanResponse;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::{PipelineUid, ShardId, SourceUid};
use scheduling_logic_model::{IndexerOrd, SourceOrd};
Expand Down Expand Up @@ -610,7 +609,6 @@ pub fn build_physical_indexing_plan(
indexer_id_to_cpu_capacities: &FnvHashMap<String, CpuCapacity>,
previous_plan_opt: Option<&PhysicalIndexingPlan>,
shard_locations: &ShardLocations,
mut debug_output_opt: Option<&mut RebuildPlanResponse>,
) -> PhysicalIndexingPlan {
// Asserts that the source are valid.
check_sources(sources);
Expand All @@ -625,23 +623,13 @@ pub fn build_physical_indexing_plan(

// Populate the previous solution, if any.
let mut previous_solution = problem.new_solution();

if let Some(previous_plan) = previous_plan_opt {
convert_physical_plan_to_solution(previous_plan, &id_to_ord_map, &mut previous_solution);
}

if let Some(debug_output) = &mut debug_output_opt {
debug_output.previous_solution_json = serde_json::to_string(&previous_solution).unwrap();
debug_output.problem_json = serde_json::to_string(&problem).unwrap();
}

// Compute the new scheduling solution using a heuristic.
let new_solution = scheduling_logic::solve(problem, previous_solution);

if let Some(debug_output) = debug_output_opt {
debug_output.new_solution_json = serde_json::to_string(&new_solution).unwrap();
}

// Convert the new scheduling solution back to a physical plan.
let new_physical_plan = convert_scheduling_solution_to_physical_plan(
&new_solution,
Expand Down Expand Up @@ -790,7 +778,6 @@ mod tests {
&indexer_id_to_cpu_capacities,
None,
&shard_locations,
None,
);
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2);

Expand Down Expand Up @@ -861,7 +848,6 @@ mod tests {
&indexer_id_to_cpu_capacities,
None,
&shard_locations,
None,
);
assert_eq!(plan.indexing_tasks_per_indexer().len(), num_indexers);
let metrics = get_shard_locality_metrics(&plan, &shard_locations);
Expand Down Expand Up @@ -890,13 +876,8 @@ mod tests {
{
indexer_max_loads.insert(indexer1.clone(), mcpu(1_999));
// This test what happens when there isn't enough capacity on the cluster.
let physical_plan = build_physical_indexing_plan(
&sources,
&indexer_max_loads,
None,
&shard_locations,
None,
);
let physical_plan =
build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1);
let expected_tasks = physical_plan.indexer(&indexer1).unwrap();
assert_eq!(expected_tasks.len(), 2);
Expand All @@ -905,13 +886,8 @@ mod tests {
{
indexer_max_loads.insert(indexer1.clone(), mcpu(2_000));
// This test what happens when there isn't enough capacity on the cluster.
let physical_plan = build_physical_indexing_plan(
&sources,
&indexer_max_loads,
None,
&shard_locations,
None,
);
let physical_plan =
build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1);
let expected_tasks = physical_plan.indexer(&indexer1).unwrap();
assert_eq!(expected_tasks.len(), 2);
Expand Down Expand Up @@ -979,7 +955,6 @@ mod tests {
&indexer_id_to_cpu_capacities,
Some(&indexing_plan),
&shard_locations,
None,
);
let indexing_tasks = new_plan.indexer("node1").unwrap();
assert_eq!(indexing_tasks.len(), 2);
Expand Down Expand Up @@ -1020,7 +995,6 @@ mod tests {
&indexer_id_to_cpu_capacities,
Some(&indexing_plan),
&shard_locations,
None,
);
let mut indexing_tasks = new_plan.indexer(NODE).unwrap().to_vec();
for indexing_task in &mut indexing_tasks {
Expand Down Expand Up @@ -1203,13 +1177,7 @@ mod tests {
let mut capacities = FnvHashMap::default();
capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000));
let shard_locations = ShardLocations::default();
build_physical_indexing_plan(
&sources_to_schedule,
&capacities,
None,
&shard_locations,
None,
);
build_physical_indexing_plan(&sources_to_schedule, &capacities, None, &shard_locations);
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use std::collections::BTreeMap;
use std::num::NonZeroU32;

use quickwit_proto::indexing::CpuCapacity;
use serde::Serialize;

pub type SourceOrd = u32;
pub type IndexerOrd = usize;

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Source {
pub source_ord: SourceOrd,
pub load_per_shard: NonZeroU32,
Expand Down Expand Up @@ -80,7 +79,7 @@ impl Source {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug)]
pub struct SchedulingProblem {
sources: Vec<Source>,
indexer_cpu_capacities: Vec<CpuCapacity>,
Expand Down Expand Up @@ -175,7 +174,7 @@ impl SchedulingProblem {
}
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct IndexerAssignment {
pub indexer_ord: IndexerOrd,
pub num_shards_per_source: BTreeMap<SourceOrd, u32>,
Expand Down Expand Up @@ -233,7 +232,7 @@ impl IndexerAssignment {
}
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SchedulingSolution {
pub indexer_assignments: Vec<IndexerAssignment>,
}
Expand Down
17 changes: 0 additions & 17 deletions quickwit/quickwit-proto/protos/quickwit/control_plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ service ControlPlaneService {

// Asks the control plane whether the shards listed in the request should be deleted or truncated.
rpc AdviseResetShards(AdviseResetShardsRequest) returns (AdviseResetShardsResponse);

/// Compute the indexing plan from scratch, without taking into account the current state of the indexers.
rpc RebuildPlan(RebuildPlanRequest) returns (RebuildPlanResponse);
}

// Shard API
Expand Down Expand Up @@ -125,17 +122,3 @@ message AdviseResetShardsResponse {
repeated quickwit.ingest.ShardIds shards_to_delete = 1;
repeated quickwit.ingest.ShardIdPositions shards_to_truncate = 2;
}

/// Careful here! The Default implementation is used in different place of the code.
/// If you modify this struct make sure, its default's value aligns with the previous
/// behavior.
message RebuildPlanRequest {
bool reset = 1;
bool debug = 2;
}

message RebuildPlanResponse {
string previous_solution_json = 1;
string problem_json = 2;
string new_solution_json = 3;
}
Loading
Loading