From 35bc7229cae08857f961f090fbd4313ef642b538 Mon Sep 17 00:00:00 2001
From: Li0k <yuli@singularity-data.com>
Date: Thu, 23 Jan 2025 21:35:38 +0800
Subject: [PATCH] feat(storage): support more write-stop condition

---
 proto/hummock.proto                           | 11 ++++
 src/common/src/config.rs                      | 10 ++++
 .../src/cmd_impl/hummock/compaction_group.rs  |  8 +++
 src/ctl/src/lib.rs                            |  8 +++
 .../hummock/compaction/compaction_config.rs   |  6 ++
 .../compaction/compaction_group_manager.rs    | 10 +++-
 .../compaction/compaction_group_schedule.rs   | 20 ++++---
 .../src/hummock/manager/compaction/mod.rs     | 56 +++++++++++++++----
 src/meta/src/hummock/manager/transaction.rs   |  4 +-
 src/meta/src/hummock/manager/versioning.rs    |  2 +-
 .../compaction_group/hummock_version_ext.rs   |  4 +-
 src/storage/hummock_sdk/src/version.rs        | 10 ++--
 12 files changed, 116 insertions(+), 33 deletions(-)

diff --git a/proto/hummock.proto b/proto/hummock.proto
index f4565e2dd2dfa..36bc3a9660cdb 100644
--- a/proto/hummock.proto
+++ b/proto/hummock.proto
@@ -665,6 +665,10 @@ message RiseCtlUpdateCompactionConfigRequest {
       uint32 emergency_level0_sst_file_count = 25;
       // The emergency compaction limitations for the level0 sub level partition
       uint32 emergency_level0_sub_level_partition = 26;
+      // The limitation of the max sst size of the level0 to trigger the write stop
+      uint32 level0_stop_write_threshold_max_sst_count = 27;
+      // The limitation of the max sst size of the level0 to trigger the write stop
+      uint64 level0_stop_write_threshold_max_size = 28;
     }
   }
   repeated uint64 compaction_group_ids = 1;
@@ -873,6 +877,13 @@ message CompactionConfig {
 
   // The emergency compaction limitations for the level0 sub level partition
   optional uint32 emergency_level0_sub_level_partition = 26;
+
+  // The limitation of the max sst count of the level0 to trigger the write stop
+  optional uint32 level0_stop_write_threshold_max_sst_count = 27;
+  // The limitation of the max sst size of the level0 to trigger the write stop
+  optional uint64 level0_stop_write_threshold_max_size = 28;
+
+
 }
 
 message TableStats {
diff --git a/src/common/src/config.rs b/src/common/src/config.rs
index f4476ef837c65..a8d0b4e149b19 100644
--- a/src/common/src/config.rs
+++ b/src/common/src/config.rs
@@ -2217,6 +2217,8 @@ pub mod default {
         const DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MIN_SIZE: u64 = 4 * MB;
         const DEFAULT_EMERGENCY_LEVEL0_SST_FILE_COUNT: u32 = 2000; // > 50G / 32M = 1600
         const DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION: u32 = 256;
+        const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT: u32 = 10000; // 10000 * 32M = 320G
+        const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE: u64 = 300 * 1024 * MB; // 300GB
 
         use crate::catalog::hummock::CompactionFilterFlag;
 
@@ -2307,6 +2309,14 @@ pub mod default {
         pub fn emergency_level0_sub_level_partition() -> u32 {
             DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION
         }
+
+        pub fn level0_stop_write_threshold_max_sst_count() -> u32 {
+            DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT
+        }
+
+        pub fn level0_stop_write_threshold_max_size() -> u64 {
+            DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SIZE
+        }
     }
 
     pub mod object_store_config {
diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs
index c3358043cdfe2..03a2a7e7e10a8 100644
--- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs
+++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs
@@ -71,6 +71,8 @@ pub fn build_compaction_config_vec(
     max_overlapping_level_size: Option<u64>,
     emergency_level0_sst_file_count: Option<u32>,
     emergency_level0_sub_level_partition: Option<u32>,
+    level0_stop_write_threshold_max_sst_count: Option<u32>,
+    level0_stop_write_threshold_max_size: Option<u64>,
 ) -> Vec<MutableConfig> {
     let mut configs = vec![];
     if let Some(c) = max_bytes_for_level_base {
@@ -139,6 +141,12 @@ pub fn build_compaction_config_vec(
     if let Some(c) = emergency_level0_sub_level_partition {
         configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c))
     }
+    if let Some(c) = level0_stop_write_threshold_max_sst_count {
+        configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c))
+    }
+    if let Some(c) = level0_stop_write_threshold_max_size {
+        configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c))
+    }
 
     configs
 }
diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs
index c90d551634f4d..0bd77d2608614 100644
--- a/src/ctl/src/lib.rs
+++ b/src/ctl/src/lib.rs
@@ -200,6 +200,10 @@ enum HummockCommands {
         emergency_level0_sst_file_count: Option<u32>,
         #[clap(long)]
         emergency_level0_sub_level_partition: Option<u32>,
+        #[clap(long)]
+        level0_stop_write_threshold_max_sst_count: Option<u32>,
+        #[clap(long)]
+        level0_stop_write_threshold_max_size: Option<u64>,
     },
     /// Split given compaction group into two. Moves the given tables to the new group.
     SplitCompactionGroup {
@@ -611,6 +615,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
             max_overlapping_level_size,
             emergency_level0_sst_file_count,
             emergency_level0_sub_level_partition,
+            level0_stop_write_threshold_max_sst_count,
+            level0_stop_write_threshold_max_size,
         }) => {
             cmd_impl::hummock::update_compaction_config(
                 context,
@@ -646,6 +652,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
                     max_overlapping_level_size,
                     emergency_level0_sst_file_count,
                     emergency_level0_sub_level_partition,
+                    level0_stop_write_threshold_max_sst_count,
+                    level0_stop_write_threshold_max_size,
                 ),
             )
             .await?
diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs
index 5694c1bfe0234..0bc8b4234bcac 100644
--- a/src/meta/src/hummock/compaction/compaction_config.rs
+++ b/src/meta/src/hummock/compaction/compaction_config.rs
@@ -78,6 +78,12 @@ impl CompactionConfigBuilder {
                 emergency_level0_sub_level_partition: Some(
                     compaction_config::emergency_level0_sub_level_partition(),
                 ),
+                level0_stop_write_threshold_max_sst_count: Some(
+                    compaction_config::level0_stop_write_threshold_max_sst_count(),
+                ),
+                level0_stop_write_threshold_max_size: Some(
+                    compaction_config::level0_stop_write_threshold_max_size(),
+                ),
             },
         }
     }
diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs
index bcd514d05dc78..77d5c66c5042f 100644
--- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs
+++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs
@@ -247,11 +247,11 @@ impl HummockManager {
                             }
                         };
 
-                    let group_delta = GroupDelta::GroupConstruct(PbGroupConstruct {
+                    let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
                         group_config: Some(config),
                         group_id,
                         ..Default::default()
-                    });
+                    }));
 
                     group_deltas.push(group_delta);
                 }
@@ -611,6 +611,12 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
             MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
                 target.emergency_level0_sub_level_partition = Some(*c);
             }
+            MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
+                target.level0_stop_write_threshold_max_sst_count = Some(*c);
+            }
+            MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
+                target.level0_stop_write_threshold_max_size = Some(*c);
+            }
         }
     }
 }
diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs
index f346902bc242d..266718c2c48b5 100644
--- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs
+++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs
@@ -452,15 +452,17 @@ impl HummockManager {
             new_version_delta.group_deltas.insert(
                 new_compaction_group_id,
                 GroupDeltas {
-                    group_deltas: vec![GroupDelta::GroupConstruct(PbGroupConstruct {
-                        group_config: Some(config.clone()),
-                        group_id: new_compaction_group_id,
-                        parent_group_id,
-                        new_sst_start_id,
-                        table_ids: vec![],
-                        version: CompatibilityVersion::SplitGroupByTableId as i32, // for compatibility
-                        split_key: Some(split_key.into()),
-                    })],
+                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(
+                        PbGroupConstruct {
+                            group_config: Some(config.clone()),
+                            group_id: new_compaction_group_id,
+                            parent_group_id,
+                            new_sst_start_id,
+                            table_ids: vec![],
+                            version: CompatibilityVersion::SplitGroupByTableId as i32, // for compatibility
+                            split_key: Some(split_key.into()),
+                        }
+                    ))],
                 },
             );
             (new_compaction_group_id, config)
diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs
index a996787ef9bd3..88a731b6e84f0 100644
--- a/src/meta/src/hummock/manager/compaction/mod.rs
+++ b/src/meta/src/hummock/manager/compaction/mod.rs
@@ -1588,8 +1588,46 @@ pub fn check_cg_write_limit(
 ) -> WriteLimitType {
     let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
     let l0_sub_level_number = levels.l0.sub_levels.len();
+
+    // level count
     if threshold < l0_sub_level_number {
-        return WriteLimitType::WriteStop(l0_sub_level_number, threshold);
+        return WriteLimitType::WriteStop(format!(
+            "WriteStop(l0_sub_level_number: {}, threshold: {}) too many L0 sub levels",
+            l0_sub_level_number, threshold
+        ));
+    }
+
+    let threshold = compaction_config
+        .level0_stop_write_threshold_max_sst_count
+        .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
+        as usize;
+    let l0_sst_count = levels
+        .l0
+        .sub_levels
+        .iter()
+        .map(|l| l.table_infos.len())
+        .sum();
+    if threshold < l0_sst_count {
+        return WriteLimitType::WriteStop(format!(
+            "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst count",
+            l0_sst_count, threshold
+        ));
+    }
+
+    let threshold = compaction_config
+        .level0_stop_write_threshold_max_size
+        .unwrap_or(compaction_config::level0_stop_write_threshold_max_size());
+    let l0_size = levels
+        .l0
+        .sub_levels
+        .iter()
+        .map(|l| l.table_infos.iter().map(|t| t.sst_size).sum::<u64>())
+        .sum::<u64>();
+    if threshold < l0_size {
+        return WriteLimitType::WriteStop(format!(
+            "WriteStop(l0_size: {}, threshold: {}) too many L0 sst count",
+            l0_size, threshold
+        ));
     }
 
     WriteLimitType::Unlimited
@@ -1598,25 +1636,19 @@ pub fn check_cg_write_limit(
 pub enum WriteLimitType {
     Unlimited,
 
-    // (l0_level_count, threshold)
-    WriteStop(usize, usize),
+    WriteStop(String), // reason
 }
 
 impl WriteLimitType {
-    pub fn as_str(&self) -> String {
+    pub fn as_str(&self) -> &str {
         match self {
-            Self::Unlimited => "Unlimited".to_owned(),
-            Self::WriteStop(l0_level_count, threshold) => {
-                format!(
-                    "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
-                    l0_level_count, threshold
-                )
-            }
+            Self::Unlimited => "Unlimited",
+            Self::WriteStop(reason) => reason,
         }
     }
 
     pub fn is_write_stop(&self) -> bool {
-        matches!(self, Self::WriteStop(_, _))
+        matches!(self, Self::WriteStop(_))
     }
 }
 
diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs
index 335c8e8d157e5..668b15483669c 100644
--- a/src/meta/src/hummock/manager/transaction.rs
+++ b/src/meta/src/hummock/manager/transaction.rs
@@ -129,7 +129,7 @@ impl<'a> HummockVersionTransaction<'a> {
                 .group_deltas;
 
             #[expect(deprecated)]
-            group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
+            group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
                 group_config: Some(compaction_group.compaction_config().as_ref().clone()),
                 group_id: compaction_group.group_id(),
                 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
@@ -137,7 +137,7 @@ impl<'a> HummockVersionTransaction<'a> {
                 table_ids: vec![],
                 version: CompatibilityVersion::SplitGroupByTableId as i32,
                 split_key: None,
-            }));
+            })));
         }
 
         // Append SSTs to a new version.
diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs
index 07d99992e10b1..db8a0ebf53d4e 100644
--- a/src/meta/src/hummock/manager/versioning.rs
+++ b/src/meta/src/hummock/manager/versioning.rs
@@ -300,7 +300,7 @@ pub(super) fn calc_new_write_limits(
                         .iter()
                         .map(|table_id| table_id.table_id)
                         .collect(),
-                    reason: write_limit_type.as_str(),
+                    reason: write_limit_type.as_str().to_owned(),
                 },
             );
             continue;
diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
index 8ea85ec57dbe3..70567d5e19fa6 100644
--- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
+++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
@@ -1610,13 +1610,13 @@ mod tests {
                 (
                     2,
                     GroupDeltas {
-                        group_deltas: vec![GroupDelta::GroupConstruct(GroupConstruct {
+                        group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
                             group_config: Some(CompactionConfig {
                                 max_level: 6,
                                 ..Default::default()
                             }),
                             ..Default::default()
-                        })],
+                        }))],
                     },
                 ),
                 (
diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs
index 84cba3804a96c..da8b25867ca40 100644
--- a/src/storage/hummock_sdk/src/version.rs
+++ b/src/storage/hummock_sdk/src/version.rs
@@ -926,7 +926,7 @@ impl IntraLevelDelta {
 pub enum GroupDeltaCommon<T> {
     NewL0SubLevel(Vec<T>),
     IntraLevel(IntraLevelDeltaCommon<T>),
-    GroupConstruct(PbGroupConstruct),
+    GroupConstruct(Box<PbGroupConstruct>),
     GroupDestroy(PbGroupDestroy),
     GroupMerge(PbGroupMerge),
 }
@@ -943,7 +943,7 @@ where
                 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
             }
             Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
-                GroupDeltaCommon::GroupConstruct(pb_group_construct)
+                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct))
             }
             Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
                 GroupDeltaCommon::GroupDestroy(pb_group_destroy)
@@ -973,7 +973,7 @@ where
                 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
             },
             GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
-                delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct)),
+                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct)),
             },
             GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
                 delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
@@ -1003,7 +1003,7 @@ where
                 delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())),
             },
             GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta {
-                delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct.clone())),
+                delta_type: Some(PbDeltaType::GroupConstruct(*pb_group_construct.clone())),
             },
             GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
                 delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
@@ -1030,7 +1030,7 @@ where
                 GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta))
             }
             Some(PbDeltaType::GroupConstruct(pb_group_construct)) => {
-                GroupDeltaCommon::GroupConstruct(pb_group_construct.clone())
+                GroupDeltaCommon::GroupConstruct(Box::new(pb_group_construct.clone()))
             }
             Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
                 GroupDeltaCommon::GroupDestroy(*pb_group_destroy)