Skip to content

Commit

Permalink
feat(storage): support more write-stop condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Jan 23, 2025
1 parent 60c8c79 commit 35bc722
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 33 deletions.
11 changes: 11 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,10 @@ message RiseCtlUpdateCompactionConfigRequest {
uint32 emergency_level0_sst_file_count = 25;
// The emergency compaction limitations for the level0 sub level partition
uint32 emergency_level0_sub_level_partition = 26;
// The limitation of the max sst size of the level0 to trigger the write stop
uint32 level0_stop_write_threshold_max_sst_count = 27;
// The limitation of the max sst size of the level0 to trigger the write stop
uint64 level0_stop_write_threshold_max_size = 28;
}
}
repeated uint64 compaction_group_ids = 1;
Expand Down Expand Up @@ -873,6 +877,13 @@ message CompactionConfig {

// The emergency compaction limitations for the level0 sub level partition
optional uint32 emergency_level0_sub_level_partition = 26;

// The limitation of the max sst count of the level0 to trigger the write stop
optional uint32 level0_stop_write_threshold_max_sst_count = 27;
// The limitation of the max sst size of the level0 to trigger the write stop
optional uint64 level0_stop_write_threshold_max_size = 28;


}

message TableStats {
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2217,6 +2217,8 @@ pub mod default {
const DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MIN_SIZE: u64 = 4 * MB;
const DEFAULT_EMERGENCY_LEVEL0_SST_FILE_COUNT: u32 = 2000; // > 50G / 32M = 1600
const DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION: u32 = 256;
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT: u32 = 10000; // 10000 * 32M = 320G
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE: u64 = 300 * 1024 * MB; // 300GB

use crate::catalog::hummock::CompactionFilterFlag;

Expand Down Expand Up @@ -2307,6 +2309,14 @@ pub mod default {
pub fn emergency_level0_sub_level_partition() -> u32 {
DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION
}

pub fn level0_stop_write_threshold_max_sst_count() -> u32 {
DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT
}

pub fn level0_stop_write_threshold_max_size() -> u64 {
DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE
}
}

pub mod object_store_config {
Expand Down
8 changes: 8 additions & 0 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub fn build_compaction_config_vec(
max_overlapping_level_size: Option<u64>,
emergency_level0_sst_file_count: Option<u32>,
emergency_level0_sub_level_partition: Option<u32>,
level0_stop_write_threshold_max_sst_count: Option<u32>,
level0_stop_write_threshold_max_size: Option<u64>,
) -> Vec<MutableConfig> {
let mut configs = vec![];
if let Some(c) = max_bytes_for_level_base {
Expand Down Expand Up @@ -139,6 +141,12 @@ pub fn build_compaction_config_vec(
if let Some(c) = emergency_level0_sub_level_partition {
configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c))
}
if let Some(c) = level0_stop_write_threshold_max_sst_count {
configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c))
}
if let Some(c) = level0_stop_write_threshold_max_size {
configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c))
}

configs
}
Expand Down
8 changes: 8 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ enum HummockCommands {
emergency_level0_sst_file_count: Option<u32>,
#[clap(long)]
emergency_level0_sub_level_partition: Option<u32>,
#[clap(long)]
level0_stop_write_threshold_max_sst_count: Option<u32>,
#[clap(long)]
level0_stop_write_threshold_max_size: Option<u64>,
},
/// Split given compaction group into two. Moves the given tables to the new group.
SplitCompactionGroup {
Expand Down Expand Up @@ -611,6 +615,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_overlapping_level_size,
emergency_level0_sst_file_count,
emergency_level0_sub_level_partition,
level0_stop_write_threshold_max_sst_count,
level0_stop_write_threshold_max_size,
}) => {
cmd_impl::hummock::update_compaction_config(
context,
Expand Down Expand Up @@ -646,6 +652,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_overlapping_level_size,
emergency_level0_sst_file_count,
emergency_level0_sub_level_partition,
level0_stop_write_threshold_max_sst_count,
level0_stop_write_threshold_max_size,
),
)
.await?
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/hummock/compaction/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ impl CompactionConfigBuilder {
emergency_level0_sub_level_partition: Some(
compaction_config::emergency_level0_sub_level_partition(),
),
level0_stop_write_threshold_max_sst_count: Some(
compaction_config::level0_stop_write_threshold_max_sst_count(),
),
level0_stop_write_threshold_max_size: Some(
compaction_config::level0_stop_write_threshold_max_size(),
),
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ impl HummockManager {
}
};

let group_delta = GroupDelta::GroupConstruct(PbGroupConstruct {
let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
group_config: Some(config),
group_id,
..Default::default()
});
}));

group_deltas.push(group_delta);
}
Expand Down Expand Up @@ -611,6 +611,12 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
target.emergency_level0_sub_level_partition = Some(*c);
}
MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
target.level0_stop_write_threshold_max_sst_count = Some(*c);
}
MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
target.level0_stop_write_threshold_max_size = Some(*c);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,17 @@ impl HummockManager {
new_version_delta.group_deltas.insert(
new_compaction_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta::GroupConstruct(PbGroupConstruct {
group_config: Some(config.clone()),
group_id: new_compaction_group_id,
parent_group_id,
new_sst_start_id,
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32, // for compatibility
split_key: Some(split_key.into()),
})],
group_deltas: vec![GroupDelta::GroupConstruct(Box::new(
PbGroupConstruct {
group_config: Some(config.clone()),
group_id: new_compaction_group_id,
parent_group_id,
new_sst_start_id,
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32, // for compatibility
split_key: Some(split_key.into()),
}
))],
},
);
(new_compaction_group_id, config)
Expand Down
56 changes: 44 additions & 12 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1588,8 +1588,46 @@ pub fn check_cg_write_limit(
) -> WriteLimitType {
let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
let l0_sub_level_number = levels.l0.sub_levels.len();

// level count
if threshold < l0_sub_level_number {
return WriteLimitType::WriteStop(l0_sub_level_number, threshold);
return WriteLimitType::WriteStop(format!(
"WriteStop(l0_sub_level_number: {}, threshold: {}) too many L0 sub levels",
l0_sub_level_number, threshold
));
}

let threshold = compaction_config
.level0_stop_write_threshold_max_sst_count
.unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
as usize;
let l0_sst_count = levels
.l0
.sub_levels
.iter()
.map(|l| l.table_infos.len())
.sum();
if threshold < l0_sst_count {
return WriteLimitType::WriteStop(format!(
"WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst count",
l0_sst_count, threshold
));
}

let threshold = compaction_config
.level0_stop_write_threshold_max_size
.unwrap_or(compaction_config::level0_stop_write_threshold_max_size());
let l0_size = levels
.l0
.sub_levels
.iter()
.map(|l| l.table_infos.iter().map(|t| t.sst_size).sum::<u64>())
.sum::<u64>();
if threshold < l0_size {
return WriteLimitType::WriteStop(format!(
"WriteStop(l0_size: {}, threshold: {}) too many L0 sst count",
l0_size, threshold
));
}

WriteLimitType::Unlimited
Expand All @@ -1598,25 +1636,19 @@ pub fn check_cg_write_limit(
pub enum WriteLimitType {
Unlimited,

// (l0_level_count, threshold)
WriteStop(usize, usize),
WriteStop(String), // reason
}

impl WriteLimitType {
pub fn as_str(&self) -> String {
pub fn as_str(&self) -> &str {
match self {
Self::Unlimited => "Unlimited".to_owned(),
Self::WriteStop(l0_level_count, threshold) => {
format!(
"WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
l0_level_count, threshold
)
}
Self::Unlimited => "Unlimited",
Self::WriteStop(reason) => reason,
}
}

pub fn is_write_stop(&self) -> bool {
matches!(self, Self::WriteStop(_, _))
matches!(self, Self::WriteStop(_))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ impl<'a> HummockVersionTransaction<'a> {
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
group_config: Some(compaction_group.compaction_config().as_ref().clone()),
group_id: compaction_group.group_id(),
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
})));
}

// Append SSTs to a new version.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub(super) fn calc_new_write_limits(
.iter()
.map(|table_id| table_id.table_id)
.collect(),
reason: write_limit_type.as_str(),
reason: write_limit_type.as_str().to_owned(),
},
);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1610,13 +1610,13 @@ mod tests {
(
2,
GroupDeltas {
group_deltas: vec![GroupDelta::GroupConstruct(GroupConstruct {
group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
group_config: Some(CompactionConfig {
max_level: 6,
..Default::default()
}),
..Default::default()
})],
}))],
},
),
(
Expand Down
10 changes: 5 additions & 5 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ impl IntraLevelDelta {
pub enum GroupDeltaCommon<T> {
NewL0SubLevel(Vec<T>),
IntraLevel(IntraLevelDeltaCommon<T>),
GroupConstruct(PbGroupConstruct),
GroupConstruct(Box<PbGroupConstruct>),
GroupDestroy(PbGroupDestroy),
GroupMerge(PbGroupMerge),
}
Expand All @@ -943,7 +943,7 @@ where
GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
}
Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
GroupDeltaCommon::GroupConstruct(pb_group_construct)
GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
}
Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
GroupDeltaCommon::GroupDestroy(pb_group_destroy)
Expand Down Expand Up @@ -973,7 +973,7 @@ where
delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
},
GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct)),
delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
},
GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
Expand Down Expand Up @@ -1003,7 +1003,7 @@ where
delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
},
GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct.clone())),
delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
},
GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
Expand All @@ -1030,7 +1030,7 @@ where
GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
}
Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
GroupDeltaCommon::GroupConstruct(pb_group_construct.clone())
GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
}
Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
Expand Down

0 comments on commit 35bc722

Please sign in to comment.