Skip to content

Commit

Permalink
Removing permits from scaling up/down methods (#5463)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Oct 1, 2024
1 parent 6373edb commit 7b22075
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 36 deletions.
20 changes: 10 additions & 10 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,8 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> MetastoreResult<()> {
const NUM_PERMITS: u64 = 1;

if !model
.acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap_or(false)
{
return Ok(());
Expand All @@ -698,7 +696,7 @@ impl IngestController {
if successful_source_uids.is_empty() {
// We did not manage to create the shard.
// We can release our permit.
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Up);
warn!(
index_uid=%source_uid.index_uid,
source_id=%source_uid.source_id,
Expand All @@ -722,7 +720,7 @@ impl IngestController {
source_id=%source_uid.source_id,
"scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}"
);
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Up);
Err(metastore_error)
}
}
Expand Down Expand Up @@ -860,10 +858,12 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> MetastoreResult<()> {
const NUM_PERMITS: u64 = 1;
if shard_stats.num_open_shards == 0 {
return Ok(());
}

if !model
.acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap_or(false)
{
return Ok(());
Expand All @@ -876,12 +876,12 @@ impl IngestController {
"scaling down number of shards to {new_num_open_shards}"
);
let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
};
info!("scaling down shard {shard_id} from {leader_id}");
let Some(ingester) = self.ingester_pool.get(&leader_id) else {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
};
let shard_pkeys = vec![ShardPKey {
Expand All @@ -896,7 +896,7 @@ impl IngestController {
.await
{
warn!("failed to scale down number of shards: {error}");
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
}
model.close_shards(&source_uid, &[shard_id]);
Expand Down
12 changes: 3 additions & 9 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,25 +378,19 @@ impl ControlPlaneModel {
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) -> Option<bool> {
self.shard_table
.acquire_scaling_permits(source_uid, scaling_mode, num_permits)
.acquire_scaling_permits(source_uid, scaling_mode)
}

pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
self.shard_table
.drain_scaling_permits(source_uid, scaling_mode)
}

pub fn release_scaling_permits(
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) {
pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
self.shard_table
.release_scaling_permits(source_uid, scaling_mode, num_permits)
.release_scaling_permits(source_uid, scaling_mode)
}
}

Expand Down
28 changes: 11 additions & 17 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,13 @@ impl ShardTable {
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) -> Option<bool> {
let table_entry = self.table_entries.get_mut(source_uid)?;
let scaling_rate_limiter = match scaling_mode {
ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter,
ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter,
};
Some(scaling_rate_limiter.acquire(num_permits))
Some(scaling_rate_limiter.acquire(1))
}

pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
Expand All @@ -564,18 +563,13 @@ impl ShardTable {
}
}

pub fn release_scaling_permits(
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) {
pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
if let Some(table_entry) = self.table_entries.get_mut(source_uid) {
let scaling_rate_limiter = match scaling_mode {
ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter,
ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter,
};
scaling_rate_limiter.release(num_permits);
scaling_rate_limiter.release(1);
}
}
}
Expand Down Expand Up @@ -1058,7 +1052,7 @@ mod tests {
source_id: source_id.clone(),
};
assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.is_none());

shard_table.add_source(&index_uid, &source_id);
Expand All @@ -1071,7 +1065,7 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap());

let new_available_permits = shard_table
Expand All @@ -1096,7 +1090,7 @@ mod tests {
source_id: source_id.clone(),
};
assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.is_none());

shard_table.add_source(&index_uid, &source_id);
Expand All @@ -1109,7 +1103,7 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap());

let new_available_permits = shard_table
Expand Down Expand Up @@ -1143,10 +1137,10 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap());

shard_table.release_scaling_permits(&source_uid, ScalingMode::Up, 1);
shard_table.release_scaling_permits(&source_uid, ScalingMode::Up);

let new_available_permits = shard_table
.table_entries
Expand Down Expand Up @@ -1179,10 +1173,10 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap());

shard_table.release_scaling_permits(&source_uid, ScalingMode::Down, 1);
shard_table.release_scaling_permits(&source_uid, ScalingMode::Down);

let new_available_permits = shard_table
.table_entries
Expand Down

0 comments on commit 7b22075

Please sign in to comment.