Skip to content

Commit

Permalink
feat(compaction): Limit the size of the new overlapping level (#19277)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Jan 22, 2025
1 parent 11a58d7 commit e49f440
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 35 deletions.
1 change: 1 addition & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ enum ComputeCommands {
ShowConfig { host: String },
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand)]
enum HummockCommands {
/// list latest Hummock version on meta node
Expand Down
99 changes: 94 additions & 5 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::default::compaction_config;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
Expand Down Expand Up @@ -118,7 +120,7 @@ impl HummockManager {
let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
let mut new_table_ids = HashMap::new();
let mut new_compaction_groups = HashMap::new();
let mut new_compaction_groups = Vec::new();
let mut compaction_group_manager_txn = None;
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

Expand Down Expand Up @@ -171,8 +173,11 @@ impl HummockManager {
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups
.insert(new_compaction_group_id, compaction_group_config.clone());
let new_compaction_group = CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
};
new_compaction_groups.push(new_compaction_group);
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
Expand All @@ -196,12 +201,35 @@ impl HummockManager {
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
// fill compaction_groups
let mut group_id_to_config = HashMap::new();
if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.get(cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
} else {
let compaction_group_manager = self.compaction_group_manager.read().await;
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.try_get_compaction_group_config(*cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
}

let group_id_to_sub_levels =
rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);

let time_travel_delta = version.pre_commit_epoch(
&tables_to_commit,
new_compaction_groups,
commit_sstables,
group_id_to_sub_levels,
&new_table_ids,
new_table_watermarks,
change_log_delta,
Expand Down Expand Up @@ -340,6 +368,7 @@ impl HummockManager {
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
for commit_sst in sstables {
let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
for table_id in &commit_sst.sst_info.table_ids {
Expand Down Expand Up @@ -408,6 +437,12 @@ impl HummockManager {
}
}

// order check
for ssts in commit_sstables.values() {
let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
}

Ok(commit_sstables)
}
}
Expand All @@ -432,3 +467,57 @@ fn on_handle_add_new_table(

Ok(())
}

/// Rewrite the commit sstables to sub-levels based on the compaction group config.
/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
fn rewrite_commit_sstables_to_sub_level(
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
for (group_id, inserted_table_infos) in commit_sstables {
let config = group_id_to_config
.get(&group_id)
.expect("compaction group should exist");

let mut accumulated_size = 0;
let mut ssts = vec![];
let sub_level_size_limit = config
.max_overlapping_level_size
.unwrap_or(compaction_config::max_overlapping_level_size());

let level = overlapping_sstables.entry(group_id).or_default();

for sst in inserted_table_infos {
accumulated_size += sst.sst_size;
ssts.push(sst);
if accumulated_size > sub_level_size_limit {
level.push(ssts);

// reset the accumulated size and ssts
accumulated_size = 0;
ssts = vec![];
}
}

if !ssts.is_empty() {
level.push(ssts);
}

// The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
level.reverse();
}

overlapping_sstables
}

fn is_ordered_subset(vec_1: &Vec<u64>, vec_2: &Vec<u64>) -> bool {
let mut vec_2_iter = vec_2.iter().peekable();
for item in vec_1 {
if vec_2_iter.peek() == Some(&item) {
vec_2_iter.next();
}
}

vec_2_iter.peek().is_none()
}
54 changes: 26 additions & 28 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand All @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
HummockVersionStats, StateTableInfoDelta,
CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
StateTableInfoDelta,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use crate::hummock::model::CompactionGroup;
use crate::manager::NotificationManager;
use crate::model::{
InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
Expand Down Expand Up @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> {
pub(super) fn pre_commit_epoch(
&mut self,
tables_to_commit: &HashMap<TableId, u64>,
new_compaction_groups: HashMap<CompactionGroupId, Arc<CompactionConfig>>,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_compaction_groups: Vec<CompactionGroup>,
group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
new_table_ids: &HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
Expand All @@ -121,38 +121,36 @@ impl<'a> HummockVersionTransaction<'a> {
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

for (compaction_group_id, compaction_group_config) in new_compaction_groups {
{
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some((*compaction_group_config).clone()),
group_id: compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}
for compaction_group in &new_compaction_groups {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group.group_id())
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group.compaction_config().as_ref().clone()),
group_id: compaction_group.group_id(),
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
for sub_level in sub_levels {
group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
}
}

// update state table info
Expand Down
128 changes: 128 additions & 0 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_hummock_sdk::table_watermark::{
TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
use risingwave_meta::hummock::test_utils::get_compaction_group_id_by_table_id;
use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
Expand Down Expand Up @@ -2841,3 +2842,130 @@ async fn test_commit_multi_epoch() {
assert_eq!(info.committed_epoch, epoch3);
}
}

#[tokio::test]
async fn test_commit_with_large_size() {
let test_env = prepare_hummock_test_env().await;
let context_id = test_env.meta_client.context_id();
let existing_table_id = TableId::new(1);
let initial_epoch = INVALID_EPOCH;

let commit_epoch =
|epoch, ssts: Vec<SstableInfo>, new_table_fragment_infos, tables_to_commit: &[TableId]| {
let manager = &test_env.manager;
let tables_to_commit = tables_to_commit
.iter()
.map(|table_id| (*table_id, epoch))
.collect();
let sst_to_context = ssts.iter().map(|sst| (sst.object_id, context_id)).collect();

let sstables = ssts
.into_iter()
.map(|sst| LocalSstableInfo {
table_stats: sst
.table_ids
.iter()
.map(|&table_id| {
(
table_id,
TableStats {
total_compressed_size: 10,
..Default::default()
},
)
})
.collect(),
sst_info: sst,
created_at: u64::MAX,
})
.collect_vec();

async move {
manager
.commit_epoch(CommitEpochInfo {
new_table_watermarks: Default::default(),
sst_to_context,
sstables,
new_table_fragment_infos,
change_log_delta: Default::default(),
tables_to_commit,
})
.await
.unwrap();
}
};

let epoch1 = initial_epoch.next_epoch();
let sst1_epoch1 = SstableInfo {
sst_id: 11,
object_id: 1,
table_ids: vec![existing_table_id.table_id],
file_size: 512 << 20,
sst_size: 512 << 20,
..Default::default()
};

let sst1_epoch2 = SstableInfo {
sst_id: 12,
object_id: 2,
table_ids: vec![existing_table_id.table_id],
file_size: 512 << 20,
sst_size: 512 << 20,
..Default::default()
};

let sst1_epoch3 = SstableInfo {
sst_id: 13,
object_id: 3,
table_ids: vec![existing_table_id.table_id],
file_size: 512 << 20,
sst_size: 512 << 20,
..Default::default()
};

commit_epoch(
epoch1,
vec![
sst1_epoch3.clone(),
sst1_epoch2.clone(),
sst1_epoch1.clone(),
],
vec![NewTableFragmentInfo::NewCompactionGroup {
table_ids: HashSet::from_iter([existing_table_id]),
}],
&[existing_table_id],
)
.await;

let cg_id =
get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id())
.await;

let l0_sub_levels = test_env
.manager
.get_current_version()
.await
.levels
.get(&cg_id)
.unwrap()
.l0
.clone();

println!("l0_sub_levels {:?}", l0_sub_levels.sub_levels);
assert_eq!(3, l0_sub_levels.sub_levels.len());
assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len());
assert_eq!(
sst1_epoch1.object_id,
l0_sub_levels.sub_levels[0].table_infos[0].object_id
);
assert_eq!(1, l0_sub_levels.sub_levels[1].table_infos.len());
assert_eq!(
sst1_epoch2.object_id,
l0_sub_levels.sub_levels[1].table_infos[0].object_id
);
assert_eq!(1, l0_sub_levels.sub_levels[2].table_infos.len());
assert_eq!(
sst1_epoch3.object_id,
l0_sub_levels.sub_levels[2].table_infos[0].object_id
);
}
Loading

0 comments on commit e49f440

Please sign in to comment.