diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 0f33471b21ac..3cea492071c7 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -240,7 +240,11 @@ impl CompactionScheduler { request: CompactionRequest, options: compact_request::Options, ) -> Result<()> { - let picker = new_picker(options.clone(), &request.current_version.options.compaction); + let picker = new_picker( + &options, + &request.current_version.options.compaction, + request.current_version.options.append_mode, + ); let region_id = request.region_id(); let CompactionRequest { engine_config, @@ -500,7 +504,7 @@ pub struct CompactionOutput { pub inputs: Vec, /// Whether to remove deletion markers. pub filter_deleted: bool, - /// Compaction output time range. + /// Compaction output time range. Only windowed compaction specifies output time range. pub output_time_range: Option, } diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 9c8c0e02bd1e..12b9dd5fefb6 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -38,7 +38,7 @@ use crate::memtable::MemtableBuilderProvider; use crate::read::Source; use crate::region::opener::new_manifest_dir; use crate::region::options::RegionOptions; -use crate::region::version::{VersionBuilder, VersionControl, VersionRef}; +use crate::region::version::{VersionBuilder, VersionRef}; use crate::region::ManifestContext; use crate::region::RegionState::Writable; use crate::schedule::scheduler::LocalScheduler; @@ -164,8 +164,7 @@ pub async fn open_compaction_region( .compaction_time_window(manifest.compaction_time_window) .options(req.region_options.clone()) .build(); - let version_control = Arc::new(VersionControl::new(version)); - version_control.current().version + Arc::new(version) }; Ok(CompactionRegion { @@ -395,8 +394,9 @@ impl Compactor for DefaultCompactor { ) -> Result<()> { let picker_output = { let picker_output = new_picker( - compact_request_options, + &compact_request_options, &compaction_region.region_options.compaction, + compaction_region.region_options.append_mode, ) .pick(compaction_region); diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 30c8d2844638..9397c2bf6470 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -119,10 +119,11 @@ impl PickerOutput { /// Create a new picker based on the compaction request options and compaction options. pub fn new_picker( - compact_request_options: compact_request::Options, + compact_request_options: &compact_request::Options, compaction_options: &CompactionOptions, + append_mode: bool, ) -> Arc { - if let compact_request::Options::StrictWindow(window) = &compact_request_options { + if let compact_request::Options::StrictWindow(window) = compact_request_options { let window = if window.window_seconds == 0 { None } else { @@ -131,13 +132,15 @@ pub fn new_picker( Arc::new(WindowedCompactionPicker::new(window)) as Arc<_> } else { match compaction_options { - CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( - twcs_opts.max_active_window_runs, - twcs_opts.max_active_window_files, - twcs_opts.max_inactive_window_runs, - twcs_opts.max_inactive_window_files, - twcs_opts.time_window_seconds(), - )) as Arc<_>, + CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker { + max_active_window_runs: twcs_opts.max_active_window_runs, + max_active_window_files: twcs_opts.max_active_window_files, + max_inactive_window_runs: twcs_opts.max_inactive_window_runs, + max_inactive_window_files: twcs_opts.max_inactive_window_files, + time_window_seconds: twcs_opts.time_window_seconds(), + max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()), + append_mode, + }) as Arc<_>, } } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 4bbad692f05f..c6d2a112aad4 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -35,30 +35,23 @@ const LEVEL_COMPACTED: Level = 1; /// candidates. #[derive(Debug)] pub struct TwcsPicker { - max_active_window_runs: usize, - max_active_window_files: usize, - max_inactive_window_runs: usize, - max_inactive_window_files: usize, - time_window_seconds: Option, + /// Max allowed sorted runs in active window. + pub max_active_window_runs: usize, + /// Max allowed files in active window. + pub max_active_window_files: usize, + /// Max allowed sorted runs in inactive windows. + pub max_inactive_window_runs: usize, + /// Max allowed files in inactive windows. + pub max_inactive_window_files: usize, + /// Compaction time window in seconds. + pub time_window_seconds: Option, + /// Max allowed compaction output file size. + pub max_output_file_size: Option, + /// Whether the target region is in append mode. + pub append_mode: bool, } impl TwcsPicker { - pub fn new( - max_active_window_runs: usize, - max_active_window_files: usize, - max_inactive_window_runs: usize, - max_inactive_window_files: usize, - time_window_seconds: Option, - ) -> Self { - Self { - max_inactive_window_runs, - max_active_window_runs, - time_window_seconds, - max_active_window_files, - max_inactive_window_files, - } - } - /// Builds compaction output from files. /// For active writing window, we allow for at most `max_active_window_runs` files to alleviate /// fragmentation. For other windows, we allow at most 1 file at each window. @@ -82,47 +75,114 @@ impl TwcsPicker { ) }; - // we only remove deletion markers once no file in current window overlaps with any other window. let found_runs = sorted_runs.len(); - let filter_deleted = !files.overlapping && (found_runs == 1 || max_runs == 1); + // We only remove deletion markers once no file in current window overlaps with any other window + // and region is not in append mode. + let filter_deleted = + !files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode; - if found_runs > max_runs { + let inputs = if found_runs > max_runs { let files_to_compact = reduce_runs(sorted_runs, max_runs); - info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}, remove deletion markers: {}", active_window, *window,max_runs, found_runs, files_to_compact.len(), filter_deleted); - for inputs in files_to_compact { - output.push(CompactionOutput { - output_file_id: FileId::random(), - output_level: LEVEL_COMPACTED, // always compact to l1 - inputs, - filter_deleted, - output_time_range: None, // we do not enforce output time range in twcs compactions. - }); - } + let files_to_compact_len = files_to_compact.len(); + info!( + "Building compaction output, active window: {:?}, \ + current window: {}, \ + max runs: {}, \ + found runs: {}, \ + output size: {}, \ + max output size: {:?}, \ + remove deletion markers: {}", + active_window, + *window, + max_runs, + found_runs, + files_to_compact_len, + self.max_output_file_size, + filter_deleted + ); + files_to_compact } else if files.files.len() > max_files { - debug!( - "Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}", + info!( + "Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}", *window, active_window, max_files, - files.files.len() + files.files.len(), + self.max_output_file_size, + filter_deleted, ); // Files in window exceeds file num limit - let to_merge = enforce_file_num(&files.files, max_files); + vec![enforce_file_num(&files.files, max_files)] + } else { + debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); + continue; + }; + + let split_inputs = if !filter_deleted + && let Some(max_output_file_size) = self.max_output_file_size + { + let len_before_split = inputs.len(); + let maybe_split = enforce_max_output_size(inputs, max_output_file_size); + if maybe_split.len() != len_before_split { + info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split); + } + maybe_split + } else { + inputs + }; + + for input in split_inputs { + debug_assert!(input.len() > 1); output.push(CompactionOutput { output_file_id: FileId::random(), output_level: LEVEL_COMPACTED, // always compact to l1 - inputs: to_merge, + inputs: input, filter_deleted, - output_time_range: None, + output_time_range: None, // we do not enforce output time range in twcs compactions. }); - } else { - debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); } } output } } +/// Limits the size of compaction output in a naive manner. +/// todo(hl): we can find the output file size more precisely by checking the time range +/// of each row group and adding the sizes of those non-overlapping row groups. But now +/// we'd better not to expose the SST details in this level. +fn enforce_max_output_size( + inputs: Vec>, + max_output_file_size: u64, +) -> Vec> { + inputs + .into_iter() + .flat_map(|input| { + debug_assert!(input.len() > 1); + let estimated_output_size = input.iter().map(|f| f.size()).sum::(); + if estimated_output_size < max_output_file_size { + // total file size does not exceed the threshold, just return the original input. + return vec![input]; + } + let mut splits = vec![]; + let mut new_input = vec![]; + let mut new_input_size = 0; + for f in input { + if new_input_size + f.size() > max_output_file_size { + splits.push(std::mem::take(&mut new_input)); + new_input_size = 0; + } + new_input_size += f.size(); + new_input.push(f); + } + if !new_input.is_empty() { + splits.push(new_input); + } + splits + }) + .filter(|p| p.len() > 1) + .collect() +} + /// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses /// the solution with minimum overhead according to files sizes to be merged. /// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs. @@ -305,10 +365,12 @@ fn find_latest_window_in_seconds<'a>( #[cfg(test)] mod tests { use std::collections::HashSet; + use std::sync::Arc; use super::*; use crate::compaction::test_util::{new_file_handle, new_file_handles}; - use crate::sst::file::Level; + use crate::sst::file::{FileMeta, Level}; + use crate::test_util::NoopFilePurger; #[test] fn test_get_latest_window_in_seconds() { @@ -525,8 +587,16 @@ mod tests { let mut windows = assign_to_windows(self.input_files.iter(), self.window_size); let active_window = find_latest_window_in_seconds(self.input_files.iter(), self.window_size); - let output = TwcsPicker::new(4, usize::MAX, 1, usize::MAX, None) - .build_output(&mut windows, active_window); + let output = TwcsPicker { + max_active_window_runs: 4, + max_active_window_files: usize::MAX, + max_inactive_window_runs: 1, + max_inactive_window_files: usize::MAX, + time_window_seconds: None, + max_output_file_size: None, + append_mode: false, + } + .build_output(&mut windows, active_window); let output = output .iter() @@ -641,5 +711,43 @@ mod tests { .check(); } + fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec { + inputs + .iter() + .map(|(start, end, size)| { + FileHandle::new( + FileMeta { + region_id: Default::default(), + file_id: Default::default(), + time_range: ( + Timestamp::new_millisecond(*start), + Timestamp::new_millisecond(*end), + ), + level: 0, + file_size: *size, + available_indexes: Default::default(), + index_file_size: 0, + num_rows: 0, + num_row_groups: 0, + }, + Arc::new(NoopFilePurger), + ) + }) + .collect() + } + + #[test] + fn test_limit_output_size() { + let mut files = make_file_handles(&[(1, 1, 1)].repeat(6)); + let runs = find_sorted_runs(&mut files); + assert_eq!(6, runs.len()); + let files_to_merge = reduce_runs(runs, 2); + + let enforced = enforce_max_output_size(files_to_merge, 2); + assert_eq!(2, enforced.len()); + assert_eq!(2, enforced[0].len()); + assert_eq!(2, enforced[1].len()); + } + // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index cc866e550210..4abc5925b705 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -204,6 +204,8 @@ pub struct TwcsOptions { /// Compaction time window defined when creating tables. #[serde(with = "humantime_serde")] pub time_window: Option, + /// Compaction time window defined when creating tables. + pub max_output_file_size: Option, /// Whether to use remote compaction. #[serde_as(as = "DisplayFromStr")] pub remote_compaction: bool, @@ -236,6 +238,7 @@ impl Default for TwcsOptions { max_inactive_window_runs: 1, max_inactive_window_files: 1, time_window: None, + max_output_file_size: None, remote_compaction: false, fallback_to_local: true, } @@ -597,6 +600,7 @@ mod tests { ("compaction.twcs.max_active_window_files", "11"), ("compaction.twcs.max_inactive_window_runs", "2"), ("compaction.twcs.max_inactive_window_files", "3"), + ("compaction.twcs.max_output_file_size", "1GB"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ("compaction.twcs.remote_compaction", "false"), @@ -624,6 +628,7 @@ mod tests { max_inactive_window_runs: 2, max_inactive_window_files: 3, time_window: Some(Duration::from_secs(3600 * 2)), + max_output_file_size: Some(ReadableSize::gb(1)), remote_compaction: false, fallback_to_local: true, }), @@ -656,6 +661,7 @@ mod tests { max_inactive_window_runs: 2, max_inactive_window_files: usize::MAX, time_window: Some(Duration::from_secs(3600 * 2)), + max_output_file_size: None, remote_compaction: false, fallback_to_local: true, }), @@ -693,6 +699,7 @@ mod tests { "compaction.twcs.max_active_window_files": "11", "compaction.twcs.max_inactive_window_runs": "2", "compaction.twcs.max_inactive_window_files": "7", + "compaction.twcs.max_output_file_size": "7MB", "compaction.twcs.time_window": "2h" }, "storage": "S3", @@ -722,6 +729,7 @@ mod tests { max_inactive_window_runs: 2, max_inactive_window_files: 7, time_window: Some(Duration::from_secs(3600 * 2)), + max_output_file_size: Some(ReadableSize::mb(7)), remote_compaction: false, fallback_to_local: true, }), diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index de81bd1a2144..8b7eb420d2d0 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -370,12 +370,7 @@ impl<'a> ParserContext<'a> { .map(parse_option_string) .collect::>>()?; for key in options.keys() { - ensure!( - validate_table_option(key), - InvalidTableOptionSnafu { - key: key.to_string() - } - ); + ensure!(validate_table_option(key), InvalidTableOptionSnafu { key }); } Ok(options.into()) } diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index e641a1d2fcda..0e0f3fdac790 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -33,6 +33,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { "compaction.twcs.max_active_window_files", "compaction.twcs.max_inactive_window_runs", "compaction.twcs.max_inactive_window_files", + "compaction.twcs.max_output_file_size", "compaction.twcs.time_window", "compaction.twcs.remote_compaction", "compaction.twcs.fallback_to_local",