diff --git a/proto/hummock.proto b/proto/hummock.proto index f4565e2dd2dfa..36bc3a9660cdb 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -665,6 +665,10 @@ message RiseCtlUpdateCompactionConfigRequest { uint32 emergency_level0_sst_file_count = 25; // The emergency compaction limitations for the level0 sub level partition uint32 emergency_level0_sub_level_partition = 26; + // The limitation of the max sst size of the level0 to trigger the write stop + uint32 level0_stop_write_threshold_max_sst_count = 27; + // The limitation of the max sst size of the level0 to trigger the write stop + uint64 level0_stop_write_threshold_max_size = 28; } } repeated uint64 compaction_group_ids = 1; @@ -873,6 +877,13 @@ message CompactionConfig { // The emergency compaction limitations for the level0 sub level partition optional uint32 emergency_level0_sub_level_partition = 26; + + // The limitation of the max sst count of the level0 to trigger the write stop + optional uint32 level0_stop_write_threshold_max_sst_count = 27; + // The limitation of the max sst size of the level0 to trigger the write stop + optional uint64 level0_stop_write_threshold_max_size = 28; + + } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f4476ef837c65..a8d0b4e149b19 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2217,6 +2217,8 @@ pub mod default { const DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MIN_SIZE: u64 = 4 * MB; const DEFAULT_EMERGENCY_LEVEL0_SST_FILE_COUNT: u32 = 2000; // > 50G / 32M = 1600 const DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION: u32 = 256; + const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT: u32 = 10000; // 10000 * 32M = 320G + const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE: u64 = 300 * 1024 * MB; // 300GB use crate::catalog::hummock::CompactionFilterFlag; @@ -2307,6 +2309,14 @@ pub mod default { pub fn emergency_level0_sub_level_partition() -> u32 { DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION } + + pub fn level0_stop_write_threshold_max_sst_count() -> u32 { + DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT + } + + pub fn level0_stop_write_threshold_max_size() -> u64 { + DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE + } } pub mod object_store_config { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index c3358043cdfe2..03a2a7e7e10a8 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -71,6 +71,8 @@ pub fn build_compaction_config_vec( max_overlapping_level_size: Option, emergency_level0_sst_file_count: Option, emergency_level0_sub_level_partition: Option, + level0_stop_write_threshold_max_sst_count: Option, + level0_stop_write_threshold_max_size: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { @@ -139,6 +141,12 @@ pub fn build_compaction_config_vec( if let Some(c) = emergency_level0_sub_level_partition { configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c)) } + if let Some(c) = level0_stop_write_threshold_max_sst_count { + configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c)) + } + if let Some(c) = level0_stop_write_threshold_max_size { + configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c)) + } configs } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index c90d551634f4d..0bd77d2608614 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -200,6 +200,10 @@ enum HummockCommands { emergency_level0_sst_file_count: Option, #[clap(long)] emergency_level0_sub_level_partition: Option, + #[clap(long)] + level0_stop_write_threshold_max_sst_count: Option, + #[clap(long)] + level0_stop_write_threshold_max_size: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { @@ -611,6 +615,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_overlapping_level_size, emergency_level0_sst_file_count, emergency_level0_sub_level_partition, + level0_stop_write_threshold_max_sst_count, + level0_stop_write_threshold_max_size, }) => { cmd_impl::hummock::update_compaction_config( context, @@ -646,6 +652,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_overlapping_level_size, emergency_level0_sst_file_count, emergency_level0_sub_level_partition, + level0_stop_write_threshold_max_sst_count, + level0_stop_write_threshold_max_size, ), ) .await? diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index 5694c1bfe0234..0bc8b4234bcac 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -78,6 +78,12 @@ impl CompactionConfigBuilder { emergency_level0_sub_level_partition: Some( compaction_config::emergency_level0_sub_level_partition(), ), + level0_stop_write_threshold_max_sst_count: Some( + compaction_config::level0_stop_write_threshold_max_sst_count(), + ), + level0_stop_write_threshold_max_size: Some( + compaction_config::level0_stop_write_threshold_max_size(), + ), }, } } diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index bcd514d05dc78..77d5c66c5042f 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -247,11 +247,11 @@ impl HummockManager { } }; - let group_delta = GroupDelta::GroupConstruct(PbGroupConstruct { + let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct { group_config: Some(config), group_id, ..Default::default() - }); + })); group_deltas.push(group_delta); } @@ -611,6 +611,12 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::EmergencyLevel0SubLevelPartition(c) => { target.emergency_level0_sub_level_partition = Some(*c); } + MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => { + target.level0_stop_write_threshold_max_sst_count = Some(*c); + } + MutableConfig::Level0StopWriteThresholdMaxSize(c) => { + target.level0_stop_write_threshold_max_size = Some(*c); + } } } } diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index f346902bc242d..266718c2c48b5 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -452,15 +452,17 @@ impl HummockManager { new_version_delta.group_deltas.insert( new_compaction_group_id, GroupDeltas { - group_deltas: vec![GroupDelta::GroupConstruct(PbGroupConstruct { - group_config: Some(config.clone()), - group_id: new_compaction_group_id, - parent_group_id, - new_sst_start_id, - table_ids: vec![], - version: CompatibilityVersion::SplitGroupByTableId as i32, // for compatibility - split_key: Some(split_key.into()), - })], + group_deltas: vec![GroupDelta::GroupConstruct(Box::new( + PbGroupConstruct { + group_config: Some(config.clone()), + group_id: new_compaction_group_id, + parent_group_id, + new_sst_start_id, + table_ids: vec![], + version: CompatibilityVersion::SplitGroupByTableId as i32, // for compatibility + split_key: Some(split_key.into()), + } + ))], }, ); (new_compaction_group_id, config) diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index a996787ef9bd3..88a731b6e84f0 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -1588,8 +1588,46 @@ pub fn check_cg_write_limit( ) -> WriteLimitType { let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize; let l0_sub_level_number = levels.l0.sub_levels.len(); + + // level count if threshold < l0_sub_level_number { - return WriteLimitType::WriteStop(l0_sub_level_number, threshold); + return WriteLimitType::WriteStop(format!( + "WriteStop(l0_sub_level_number: {}, threshold: {}) too many L0 sub levels", + l0_sub_level_number, threshold + )); + } + + let threshold = compaction_config + .level0_stop_write_threshold_max_sst_count + .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count()) + as usize; + let l0_sst_count = levels + .l0 + .sub_levels + .iter() + .map(|l| l.table_infos.len()) + .sum(); + if threshold < l0_sst_count { + return WriteLimitType::WriteStop(format!( + "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst count", + l0_sst_count, threshold + )); + } + + let threshold = compaction_config + .level0_stop_write_threshold_max_size + .unwrap_or(compaction_config::level0_stop_write_threshold_max_size()); + let l0_size = levels + .l0 + .sub_levels + .iter() + .map(|l| l.table_infos.iter().map(|t| t.sst_size).sum::()) + .sum::(); + if threshold < l0_size { + return WriteLimitType::WriteStop(format!( + "WriteStop(l0_size: {}, threshold: {}) too many L0 sst count", + l0_size, threshold + )); } WriteLimitType::Unlimited @@ -1598,25 +1636,19 @@ pub fn check_cg_write_limit( pub enum WriteLimitType { Unlimited, - // (l0_level_count, threshold) - WriteStop(usize, usize), + WriteStop(String), // reason } impl WriteLimitType { - pub fn as_str(&self) -> String { + pub fn as_str(&self) -> &str { match self { - Self::Unlimited => "Unlimited".to_owned(), - Self::WriteStop(l0_level_count, threshold) => { - format!( - "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels", - l0_level_count, threshold - ) - } + Self::Unlimited => "Unlimited", + Self::WriteStop(reason) => reason, } } pub fn is_write_stop(&self) -> bool { - matches!(self, Self::WriteStop(_, _)) + matches!(self, Self::WriteStop(_)) } } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 335c8e8d157e5..668b15483669c 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -129,7 +129,7 @@ impl<'a> HummockVersionTransaction<'a> { .group_deltas; #[expect(deprecated)] - group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { + group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct { group_config: Some(compaction_group.compaction_config().as_ref().clone()), group_id: compaction_group.group_id(), parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, @@ -137,7 +137,7 @@ impl<'a> HummockVersionTransaction<'a> { table_ids: vec![], version: CompatibilityVersion::SplitGroupByTableId as i32, split_key: None, - })); + }))); } // Append SSTs to a new version. diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 07d99992e10b1..db8a0ebf53d4e 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -300,7 +300,7 @@ pub(super) fn calc_new_write_limits( .iter() .map(|table_id| table_id.table_id) .collect(), - reason: write_limit_type.as_str(), + reason: write_limit_type.as_str().to_owned(), }, ); continue; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 8ea85ec57dbe3..70567d5e19fa6 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1610,13 +1610,13 @@ mod tests { ( 2, GroupDeltas { - group_deltas: vec![GroupDelta::GroupConstruct(GroupConstruct { + group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct { group_config: Some(CompactionConfig { max_level: 6, ..Default::default() }), ..Default::default() - })], + }))], }, ), ( diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 84cba3804a96c..da8b25867ca40 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -926,7 +926,7 @@ impl IntraLevelDelta { pub enum GroupDeltaCommon { NewL0SubLevel(Vec), IntraLevel(IntraLevelDeltaCommon), - GroupConstruct(PbGroupConstruct), + GroupConstruct(Box), GroupDestroy(PbGroupDestroy), GroupMerge(PbGroupMerge), } @@ -943,7 +943,7 @@ where GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta)) } Some(PbDeltaType::GroupConstruct(pb_group_construct)) => { - GroupDeltaCommon::GroupConstruct(pb_group_construct) + GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct)) } Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => { GroupDeltaCommon::GroupDestroy(pb_group_destroy) @@ -973,7 +973,7 @@ where delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())), }, GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta { - delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct)), + delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)), }, GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)), @@ -1003,7 +1003,7 @@ where delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())), }, GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta { - delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct.clone())), + delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())), }, GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)), @@ -1030,7 +1030,7 @@ where GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta)) } Some(PbDeltaType::GroupConstruct(pb_group_construct)) => { - GroupDeltaCommon::GroupConstruct(pb_group_construct.clone()) + GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone())) } Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => { GroupDeltaCommon::GroupDestroy(*pb_group_destroy)