diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 3ae1012fd86d..30c8d2844638 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -133,7 +133,9 @@ pub fn new_picker( match compaction_options { CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( twcs_opts.max_active_window_runs, + twcs_opts.max_active_window_files, twcs_opts.max_inactive_window_runs, + twcs_opts.max_inactive_window_files, twcs_opts.time_window_seconds(), )) as Arc<_>, } diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 902228623787..1df462004f8d 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -43,3 +43,29 @@ pub fn new_file_handle( file_purger, ) } + +pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec { + let file_purger = new_noop_file_purger(); + file_specs + .iter() + .map(|(start, end, size)| { + FileHandle::new( + FileMeta { + region_id: 0.into(), + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(*start), + Timestamp::new_millisecond(*end), + ), + level: 0, + file_size: *size, + available_indexes: Default::default(), + index_file_size: 0, + num_rows: 0, + num_row_groups: 0, + }, + file_purger.clone(), + ) + }) + .collect() +} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 81ad5268501f..4bbad692f05f 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -14,7 +14,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use common_telemetry::{debug, info}; use common_time::timestamp::TimeUnit; @@ -24,7 +24,7 @@ use common_time::Timestamp; use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; -use crate::compaction::run::{find_sorted_runs, reduce_runs}; +use crate::compaction::run::{find_sorted_runs, reduce_runs, Item}; use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::sst::file::{overlaps, FileHandle, FileId, Level}; use crate::sst::version::LevelMeta; @@ -33,31 +33,29 @@ const LEVEL_COMPACTED: Level = 1; /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. +#[derive(Debug)] pub struct TwcsPicker { max_active_window_runs: usize, + max_active_window_files: usize, max_inactive_window_runs: usize, + max_inactive_window_files: usize, time_window_seconds: Option, } -impl Debug for TwcsPicker { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TwcsPicker") - .field("max_active_window_runs", &self.max_active_window_runs) - .field("max_inactive_window_runs", &self.max_inactive_window_runs) - .finish() - } -} - impl TwcsPicker { pub fn new( max_active_window_runs: usize, + max_active_window_files: usize, max_inactive_window_runs: usize, + max_inactive_window_files: usize, time_window_seconds: Option, ) -> Self { Self { max_inactive_window_runs, max_active_window_runs, time_window_seconds, + max_active_window_files, + max_inactive_window_files, } } @@ -73,12 +71,15 @@ impl TwcsPicker { for (window, files) in time_windows { let sorted_runs = find_sorted_runs(&mut files.files); - let max_runs = if let Some(active_window) = active_window + let (max_runs, max_files) = if let Some(active_window) = active_window && *window == active_window { - self.max_active_window_runs + (self.max_active_window_runs, self.max_active_window_files) } else { - self.max_inactive_window_runs + ( + self.max_inactive_window_runs, + self.max_inactive_window_files, + ) }; // we only remove deletion markers once no file in current window overlaps with any other window. @@ -87,16 +88,33 @@ impl TwcsPicker { if found_runs > max_runs { let files_to_compact = reduce_runs(sorted_runs, max_runs); - info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}", active_window, *window,max_runs, found_runs, files_to_compact.len()); + info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}, remove deletion markers: {}", active_window, *window,max_runs, found_runs, files_to_compact.len(), filter_deleted); for inputs in files_to_compact { output.push(CompactionOutput { output_file_id: FileId::random(), output_level: LEVEL_COMPACTED, // always compact to l1 inputs, filter_deleted, - output_time_range: None, // we do not enforce output time range in twcs compactions.}); + output_time_range: None, // we do not enforce output time range in twcs compactions. }); } + } else if files.files.len() > max_files { + debug!( + "Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}", + *window, + active_window, + max_files, + files.files.len() + ); + // Files in window exceeds file num limit + let to_merge = enforce_file_num(&files.files, max_files); + output.push(CompactionOutput { + output_file_id: FileId::random(), + output_level: LEVEL_COMPACTED, // always compact to l1 + inputs: to_merge, + filter_deleted, + output_time_range: None, + }); } else { debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); } @@ -105,6 +123,31 @@ impl TwcsPicker { } } +/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses +/// the solution with minimum overhead according to files sizes to be merged. +/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs. +/// `runs` must be sorted according to time ranges. +fn enforce_file_num(files: &[T], max_file_num: usize) -> Vec { + debug_assert!(files.len() > max_file_num); + let to_merge = files.len() - max_file_num + 1; + let mut min_penalty = usize::MAX; + let mut min_idx = 0; + + for idx in 0..=(files.len() - to_merge) { + let current_penalty: usize = files + .iter() + .skip(idx) + .take(to_merge) + .map(|f| f.size()) + .sum(); + if current_penalty < min_penalty { + min_penalty = current_penalty; + min_idx = idx; + } + } + files.iter().skip(min_idx).take(to_merge).cloned().collect() +} + impl Picker for TwcsPicker { fn pick(&self, compaction_region: &CompactionRegion) -> Option { let region_id = compaction_region.region_id; @@ -264,7 +307,7 @@ mod tests { use std::collections::HashSet; use super::*; - use crate::compaction::test_util::new_file_handle; + use crate::compaction::test_util::{new_file_handle, new_file_handles}; use crate::sst::file::Level; #[test] @@ -482,7 +525,8 @@ mod tests { let mut windows = assign_to_windows(self.input_files.iter(), self.window_size); let active_window = find_latest_window_in_seconds(self.input_files.iter(), self.window_size); - let output = TwcsPicker::new(4, 1, None).build_output(&mut windows, active_window); + let output = TwcsPicker::new(4, usize::MAX, 1, usize::MAX, None) + .build_output(&mut windows, active_window); let output = output .iter() @@ -514,6 +558,43 @@ mod tests { output_level: Level, } + fn check_enforce_file_num( + input_files: &[(i64, i64, u64)], + max_file_num: usize, + files_to_merge: &[(i64, i64)], + ) { + let mut files = new_file_handles(input_files); + // ensure sorted + find_sorted_runs(&mut files); + let mut to_merge = enforce_file_num(&files, max_file_num); + to_merge.sort_unstable_by_key(|f| f.time_range().0); + assert_eq!( + files_to_merge.to_vec(), + to_merge + .iter() + .map(|f| { + let (start, end) = f.time_range(); + (start.value(), end.value()) + }) + .collect::>() + ); + } + + #[test] + fn test_enforce_file_num() { + check_enforce_file_num( + &[(0, 300, 2), (100, 200, 1), (200, 400, 1)], + 2, + &[(100, 200), (200, 400)], + ); + + check_enforce_file_num( + &[(0, 300, 200), (100, 200, 100), (200, 400, 100)], + 1, + &[(0, 300), (100, 200), (200, 400)], + ); + } + #[test] fn test_build_twcs_output() { let file_ids = (0..4).map(|_| FileId::random()).collect::>(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 9de4a0ddf572..e19f95088c46 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -220,7 +220,9 @@ async fn test_compaction_region_with_overlapping_delete_all() { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") .insert_option("compaction.twcs.max_active_window_runs", "2") + .insert_option("compaction.twcs.max_active_window_files", "2") .insert_option("compaction.twcs.max_inactive_window_runs", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") .insert_option("compaction.twcs.time_window", "1h") .build(); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 7a28cee977d6..71882fbfc130 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -186,9 +186,15 @@ pub struct TwcsOptions { /// Max num of sorted runs that can be kept in active writing time window. #[serde_as(as = "DisplayFromStr")] pub max_active_window_runs: usize, - /// Max num of files that can be kept in inactive time window. + /// Max num of files in the active window. + #[serde_as(as = "DisplayFromStr")] + pub max_active_window_files: usize, + /// Max num of sorted runs that can be kept in inactive time windows. #[serde_as(as = "DisplayFromStr")] pub max_inactive_window_runs: usize, + /// Max num of files in inactive time windows. + #[serde_as(as = "DisplayFromStr")] + pub max_inactive_window_files: usize, /// Compaction time window defined when creating tables. #[serde(with = "humantime_serde")] pub time_window: Option, @@ -217,7 +223,9 @@ impl Default for TwcsOptions { fn default() -> Self { Self { max_active_window_runs: 4, + max_active_window_files: 4, max_inactive_window_runs: 1, + max_inactive_window_files: 1, time_window: None, remote_compaction: false, } @@ -576,7 +584,9 @@ mod tests { let map = make_map(&[ ("ttl", "7d"), ("compaction.twcs.max_active_window_runs", "8"), + ("compaction.twcs.max_active_window_files", "11"), ("compaction.twcs.max_inactive_window_runs", "2"), + ("compaction.twcs.max_inactive_window_files", "3"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ("compaction.twcs.remote_compaction", "false"), @@ -599,7 +609,9 @@ mod tests { ttl: Some(Duration::from_secs(3600 * 24 * 7)), compaction: CompactionOptions::Twcs(TwcsOptions { max_active_window_runs: 8, + max_active_window_files: 11, max_inactive_window_runs: 2, + max_inactive_window_files: 3, time_window: Some(Duration::from_secs(3600 * 2)), remote_compaction: false, }), @@ -628,7 +640,9 @@ mod tests { ttl: Some(Duration::from_secs(3600 * 24 * 7)), compaction: CompactionOptions::Twcs(TwcsOptions { max_active_window_runs: 8, + max_active_window_files: usize::MAX, max_inactive_window_runs: 2, + max_inactive_window_files: usize::MAX, time_window: Some(Duration::from_secs(3600 * 2)), remote_compaction: false, }), @@ -663,7 +677,9 @@ mod tests { "compaction": { "compaction.type": "twcs", "compaction.twcs.max_active_window_runs": "8", + "compaction.twcs.max_active_window_files": "11", "compaction.twcs.max_inactive_window_runs": "2", + "compaction.twcs.max_inactive_window_files": "7", "compaction.twcs.time_window": "2h" }, "storage": "S3", @@ -689,7 +705,9 @@ mod tests { ttl: Some(Duration::from_secs(3600 * 24 * 7)), compaction: CompactionOptions::Twcs(TwcsOptions { max_active_window_runs: 8, + max_active_window_files: 11, max_inactive_window_runs: 2, + max_inactive_window_files: 7, time_window: Some(Duration::from_secs(3600 * 2)), remote_compaction: false, }), diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 9252c970b37f..98ceb6758552 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -28,7 +28,9 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { "ttl", "compaction.type", "compaction.twcs.max_active_window_runs", + "compaction.twcs.max_active_window_files", "compaction.twcs.max_inactive_window_runs", + "compaction.twcs.max_inactive_window_files", "compaction.twcs.time_window", "compaction.twcs.remote_compaction", "storage",