Skip to content

Commit

Permalink
Emitting both a short term average and a long term average of shard (#…
Browse files Browse the repository at this point in the history
…5202)

* Emitting both a short term average and a long term average of shard
throughput.

Scaling up relies on the short term average in order to rapidly
react to a change in throughput, while scaling down and the indexing scheduler relies on the long term average.

* added comment
  • Loading branch information
fulmicoton authored Jul 16, 2024
1 parent 4e7dd61 commit fbd78bc
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
let num_shards = shard_entries.len().max(1) as u64;
let average_throughput_per_shard_bytes: u64 = shard_entries
.iter()
.map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB)
.map(|shard_entry| shard_entry.long_term_ingestion_rate.0 as u64 * bytesize::MIB)
.sum::<u64>()
.div_ceil(num_shards)
// A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is
Expand Down
88 changes: 54 additions & 34 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use tokio::task::JoinHandle;
use tracing::{debug, enabled, error, info, warn, Level};
use ulid::Ulid;

use super::scaling_arbiter::ScalingArbiter;
use crate::control_plane::ControlPlane;
use crate::ingest::wait_handle::WaitHandle;
use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats};
Expand Down Expand Up @@ -102,10 +103,7 @@ pub struct IngestController {
// This lock ensures that only one rebalance operation is performed at a time.
rebalance_lock: Arc<Mutex<()>>,
pub stats: IngestControllerStats,
// Threshold in MiB/s below which we decrease the number of shards.
scale_down_shards_threshold_mib_per_sec: f32,
// Threshold in MiB/s above which we increase the number of shards.
scale_up_shards_threshold_mib_per_sec: f32,
scaling_arbiter: ScalingArbiter,
}

impl fmt::Debug for IngestController {
Expand Down Expand Up @@ -192,9 +190,9 @@ impl IngestController {
replication_factor,
rebalance_lock: Arc::new(Mutex::new(())),
stats: IngestControllerStats::default(),
scale_up_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec * 0.8,
scale_down_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec
* 0.2,
scaling_arbiter: ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(
max_shard_ingestion_throughput_mib_per_sec,
),
}
}

Expand Down Expand Up @@ -291,20 +289,31 @@ impl IngestController {
&local_shards_update.source_uid,
&local_shards_update.shard_infos,
);
if shard_stats.avg_ingestion_rate >= self.scale_up_shards_threshold_mib_per_sec {
self.try_scale_up_shards(local_shards_update.source_uid, shard_stats, model, progress)
let Some(scaling_mode) = self.scaling_arbiter.should_scale(shard_stats) else {
return Ok(());
};

match scaling_mode {
ScalingMode::Up => {
self.try_scale_up_shards(
local_shards_update.source_uid,
shard_stats,
model,
progress,
)
.await?;
} else if shard_stats.avg_ingestion_rate <= self.scale_down_shards_threshold_mib_per_sec
&& shard_stats.num_open_shards > 1
{
self.try_scale_down_shards(
local_shards_update.source_uid,
shard_stats,
model,
progress,
)
.await?;
}
ScalingMode::Down => {
self.try_scale_down_shards(
local_shards_update.source_uid,
shard_stats,
model,
progress,
)
.await?;
}
}

Ok(())
}

Expand Down Expand Up @@ -1111,8 +1120,8 @@ fn find_scale_down_candidate(
*num_shards += 1;

if shard
.ingestion_rate
.cmp(&candidate.ingestion_rate)
.long_term_ingestion_rate
.cmp(&candidate.long_term_ingestion_rate)
.then_with(|| shard.shard_id.cmp(&candidate.shard_id))
.is_gt()
{
Expand Down Expand Up @@ -2082,13 +2091,14 @@ mod tests {
let shard_entries: Vec<ShardEntry> = model.all_shards().cloned().collect();

assert_eq!(shard_entries.len(), 1);
assert_eq!(shard_entries[0].ingestion_rate, 0);
assert_eq!(shard_entries[0].short_term_ingestion_rate, 0);

// Test update shard ingestion rate but no scale down because num open shards is 1.
let shard_infos = BTreeSet::from_iter([ShardInfo {
shard_id: ShardId::from(1),
shard_state: ShardState::Open,
ingestion_rate: RateMibPerSec(1),
short_term_ingestion_rate: RateMibPerSec(1),
long_term_ingestion_rate: RateMibPerSec(1),
}]);
let local_shards_update = LocalShardsUpdate {
leader_id: "test-ingester".into(),
Expand All @@ -2103,7 +2113,7 @@ mod tests {

let shard_entries: Vec<ShardEntry> = model.all_shards().cloned().collect();
assert_eq!(shard_entries.len(), 1);
assert_eq!(shard_entries[0].ingestion_rate, 1);
assert_eq!(shard_entries[0].short_term_ingestion_rate, 1);

// Test update shard ingestion rate with failing scale down.
let shards = vec![Shard {
Expand Down Expand Up @@ -2155,12 +2165,14 @@ mod tests {
ShardInfo {
shard_id: ShardId::from(1),
shard_state: ShardState::Open,
ingestion_rate: RateMibPerSec(1),
short_term_ingestion_rate: RateMibPerSec(1),
long_term_ingestion_rate: RateMibPerSec(1),
},
ShardInfo {
shard_id: ShardId::from(2),
shard_state: ShardState::Open,
ingestion_rate: RateMibPerSec(1),
short_term_ingestion_rate: RateMibPerSec(1),
long_term_ingestion_rate: RateMibPerSec(1),
},
]);
let local_shards_update = LocalShardsUpdate {
Expand All @@ -2178,12 +2190,14 @@ mod tests {
ShardInfo {
shard_id: ShardId::from(1),
shard_state: ShardState::Open,
ingestion_rate: RateMibPerSec(4),
short_term_ingestion_rate: RateMibPerSec(4),
long_term_ingestion_rate: RateMibPerSec(4),
},
ShardInfo {
shard_id: ShardId::from(2),
shard_state: ShardState::Open,
ingestion_rate: RateMibPerSec(4),
short_term_ingestion_rate: RateMibPerSec(4),
long_term_ingestion_rate: RateMibPerSec(4),
},
]);
let local_shards_update = LocalShardsUpdate {
Expand Down Expand Up @@ -2544,32 +2558,38 @@ mod tests {
ShardInfo {
shard_id: ShardId::from(1),
shard_state: ShardState::Open,
ingestion_rate: quickwit_ingest::RateMibPerSec(1),
short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(1),
long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(1),
},
ShardInfo {
shard_id: ShardId::from(2),
shard_state: ShardState::Open,
ingestion_rate: quickwit_ingest::RateMibPerSec(2),
short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(2),
long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(2),
},
ShardInfo {
shard_id: ShardId::from(3),
shard_state: ShardState::Open,
ingestion_rate: quickwit_ingest::RateMibPerSec(3),
short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(3),
long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(3),
},
ShardInfo {
shard_id: ShardId::from(4),
shard_state: ShardState::Open,
ingestion_rate: quickwit_ingest::RateMibPerSec(4),
short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(4),
long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(4),
},
ShardInfo {
shard_id: ShardId::from(5),
shard_state: ShardState::Open,
ingestion_rate: quickwit_ingest::RateMibPerSec(5),
short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(5),
long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(5),
},
ShardInfo {
shard_id: ShardId::from(6),
shard_state: ShardState::Open,
ingestion_rate: quickwit_ingest::RateMibPerSec(6),
short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(6),
long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(6),
},
]);
model.update_shards(&source_uid, &shard_infos);
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

pub(crate) mod ingest_controller;
mod scaling_arbiter;
mod wait_handle;

pub use ingest_controller::IngestController;
Expand Down
128 changes: 128 additions & 0 deletions quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use crate::model::{ScalingMode, ShardStats};

pub(crate) struct ScalingArbiter {
// Threshold in MiB/s below which we decrease the number of shards.
scale_down_shards_threshold_mib_per_sec: f32,

// Threshold in MiB/s above which we increase the number of shards.
// In order to make scaling up reactive, the decision is mostly taken by inspecting the short
// term threshold.
//
// However, this threshold is based on a very short window of time: 5s.
//
// In order to avoid having back and forth scaling up and down in response to temporary
// punctual spikes of a few MB, we also compute what would be the long term ingestion rate
// after scaling up, and double check that it is above the long term threshold.
scale_up_shards_short_term_threshold_mib_per_sec: f32,
scale_up_shards_long_term_threshold_mib_per_sec: f32,
}

impl ScalingArbiter {
pub fn with_max_shard_ingestion_throughput_mib_per_sec(
max_shard_throughput_mib_per_sec: f32,
) -> ScalingArbiter {
ScalingArbiter {
scale_up_shards_short_term_threshold_mib_per_sec: max_shard_throughput_mib_per_sec
* 0.8f32,
scale_up_shards_long_term_threshold_mib_per_sec: max_shard_throughput_mib_per_sec
* 0.3f32,
scale_down_shards_threshold_mib_per_sec: max_shard_throughput_mib_per_sec * 0.2f32,
}
}

pub(crate) fn should_scale(&self, shard_stats: ShardStats) -> Option<ScalingMode> {
// We scale up based on the short term threshold to scale up more aggressively.
if shard_stats.avg_short_term_ingestion_rate
>= self.scale_up_shards_short_term_threshold_mib_per_sec
{
let long_term_ingestion_rate_after_scale_up = shard_stats.avg_long_term_ingestion_rate
* (shard_stats.num_open_shards as f32)
/ (shard_stats.num_open_shards as f32 + 1.0f32);
if long_term_ingestion_rate_after_scale_up
>= self.scale_up_shards_long_term_threshold_mib_per_sec
{
return Some(ScalingMode::Up);
}
}

// On the other hand, we scale down based on the long term ingestion rate, to avoid
// scaling down just due to a very short drop in ingestion
if shard_stats.avg_long_term_ingestion_rate <= self.scale_down_shards_threshold_mib_per_sec
&& shard_stats.num_open_shards > 1
{
return Some(ScalingMode::Down);
}

None
}
}

#[cfg(test)]
mod tests {
use super::ScalingArbiter;
use crate::model::{ScalingMode, ShardStats};

#[test]
fn test_scaling_arbiter() {
let scaling_arbiter = ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(10.0);
assert_eq!(
scaling_arbiter.should_scale(ShardStats {
num_open_shards: 1,
avg_short_term_ingestion_rate: 5.0,
avg_long_term_ingestion_rate: 6.0,
}),
None
);
assert_eq!(
scaling_arbiter.should_scale(ShardStats {
num_open_shards: 1,
avg_short_term_ingestion_rate: 8.1,
avg_long_term_ingestion_rate: 8.1,
}),
Some(ScalingMode::Up)
);
assert_eq!(
scaling_arbiter.should_scale(ShardStats {
num_open_shards: 2,
avg_short_term_ingestion_rate: 3.0,
avg_long_term_ingestion_rate: 1.5,
}),
Some(ScalingMode::Down)
);
assert_eq!(
scaling_arbiter.should_scale(ShardStats {
num_open_shards: 1,
avg_short_term_ingestion_rate: 3.0,
avg_long_term_ingestion_rate: 1.5,
}),
None,
);
assert_eq!(
scaling_arbiter.should_scale(ShardStats {
num_open_shards: 1,
avg_short_term_ingestion_rate: 8.0f32,
avg_long_term_ingestion_rate: 3.0f32,
}),
None,
);
}
}
Loading

0 comments on commit fbd78bc

Please sign in to comment.