Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): limit compaction output file size #4754

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ impl CompactionScheduler {
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
let picker = new_picker(options.clone(), &request.current_version.options.compaction);
let picker = new_picker(
&options,
&request.current_version.options.compaction,
request.current_version.options.append_mode,
);
let region_id = request.region_id();
let CompactionRequest {
engine_config,
Expand Down Expand Up @@ -500,7 +504,7 @@ pub struct CompactionOutput {
pub inputs: Vec<FileHandle>,
/// Whether to remove deletion markers.
pub filter_deleted: bool,
/// Compaction output time range.
/// Compaction output time range. Only windowed compaction specifies output time range.
pub output_time_range: Option<TimestampRange>,
}

Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::memtable::MemtableBuilderProvider;
use crate::read::Source;
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionRef};
use crate::region::version::{VersionBuilder, VersionRef};
use crate::region::ManifestContext;
use crate::region::RegionState::Writable;
use crate::schedule::scheduler::LocalScheduler;
Expand Down Expand Up @@ -164,8 +164,7 @@ pub async fn open_compaction_region(
.compaction_time_window(manifest.compaction_time_window)
.options(req.region_options.clone())
.build();
let version_control = Arc::new(VersionControl::new(version));
version_control.current().version
Arc::new(version)
};

Ok(CompactionRegion {
Expand Down Expand Up @@ -395,8 +394,9 @@ impl Compactor for DefaultCompactor {
) -> Result<()> {
let picker_output = {
let picker_output = new_picker(
compact_request_options,
&compact_request_options,
&compaction_region.region_options.compaction,
compaction_region.region_options.append_mode,
)
.pick(compaction_region);

Expand Down
21 changes: 12 additions & 9 deletions src/mito2/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ impl PickerOutput {

/// Create a new picker based on the compaction request options and compaction options.
pub fn new_picker(
compact_request_options: compact_request::Options,
compact_request_options: &compact_request::Options,
compaction_options: &CompactionOptions,
append_mode: bool,
) -> Arc<dyn Picker> {
if let compact_request::Options::StrictWindow(window) = &compact_request_options {
if let compact_request::Options::StrictWindow(window) = compact_request_options {
let window = if window.window_seconds == 0 {
None
} else {
Expand All @@ -131,13 +132,15 @@ pub fn new_picker(
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
match compaction_options {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_runs,
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_runs,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
max_active_window_runs: twcs_opts.max_active_window_runs,
max_active_window_files: twcs_opts.max_active_window_files,
max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
max_inactive_window_files: twcs_opts.max_inactive_window_files,
time_window_seconds: twcs_opts.time_window_seconds(),
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
append_mode,
}) as Arc<_>,
}
}
}
Expand Down
198 changes: 153 additions & 45 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,23 @@ const LEVEL_COMPACTED: Level = 1;
/// candidates.
#[derive(Debug)]
pub struct TwcsPicker {
max_active_window_runs: usize,
max_active_window_files: usize,
max_inactive_window_runs: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
/// Max allowed sorted runs in active window.
pub max_active_window_runs: usize,
/// Max allowed files in active window.
pub max_active_window_files: usize,
/// Max allowed sorted runs in inactive windows.
pub max_inactive_window_runs: usize,
/// Max allowed files in inactive windows.
pub max_inactive_window_files: usize,
/// Compaction time window in seconds.
pub time_window_seconds: Option<i64>,
/// Max allowed compaction output file size.
pub max_output_file_size: Option<u64>,
/// Whether the target region is in append mode.
pub append_mode: bool,
}

impl TwcsPicker {
pub fn new(
max_active_window_runs: usize,
max_active_window_files: usize,
max_inactive_window_runs: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
) -> Self {
Self {
max_inactive_window_runs,
max_active_window_runs,
time_window_seconds,
max_active_window_files,
max_inactive_window_files,
}
}

/// Builds compaction output from files.
/// For active writing window, we allow for at most `max_active_window_runs` files to alleviate
/// fragmentation. For other windows, we allow at most 1 file at each window.
Expand All @@ -82,47 +75,114 @@ impl TwcsPicker {
)
};

// we only remove deletion markers once no file in current window overlaps with any other window.
let found_runs = sorted_runs.len();
let filter_deleted = !files.overlapping && (found_runs == 1 || max_runs == 1);
// We only remove deletion markers once no file in current window overlaps with any other window
// and region is not in append mode.
let filter_deleted =
!files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode;

if found_runs > max_runs {
let inputs = if found_runs > max_runs {
let files_to_compact = reduce_runs(sorted_runs, max_runs);
info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}, remove deletion markers: {}", active_window, *window,max_runs, found_runs, files_to_compact.len(), filter_deleted);
for inputs in files_to_compact {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED, // always compact to l1
inputs,
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
}
let files_to_compact_len = files_to_compact.len();
info!(
"Building compaction output, active window: {:?}, \
current window: {}, \
max runs: {}, \
found runs: {}, \
output size: {}, \
max output size: {:?}, \
remove deletion markers: {}",
active_window,
*window,
max_runs,
found_runs,
files_to_compact_len,
self.max_output_file_size,
filter_deleted
);
files_to_compact
} else if files.files.len() > max_files {
debug!(
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}",
info!(
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}",
*window,
active_window,
max_files,
files.files.len()
files.files.len(),
self.max_output_file_size,
filter_deleted,
);
// Files in window exceeds file num limit
let to_merge = enforce_file_num(&files.files, max_files);
vec![enforce_file_num(&files.files, max_files)]
} else {
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
continue;
};

let split_inputs = if !filter_deleted
&& let Some(max_output_file_size) = self.max_output_file_size
{
let len_before_split = inputs.len();
let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
if maybe_split.len() != len_before_split {
info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
}
maybe_split
} else {
inputs
};

for input in split_inputs {
debug_assert!(input.len() > 1);
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: to_merge,
inputs: input,
filter_deleted,
output_time_range: None,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
} else {
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
}
}
output
}
}

/// Limits the size of compaction output in a naive manner.
/// todo(hl): we can find the output file size more precisely by checking the time range
/// of each row group and adding the sizes of those non-overlapping row groups. But now
/// we'd better not to expose the SST details in this level.
fn enforce_max_output_size(
inputs: Vec<Vec<FileHandle>>,
max_output_file_size: u64,
) -> Vec<Vec<FileHandle>> {
inputs
.into_iter()
.flat_map(|input| {
debug_assert!(input.len() > 1);
let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
if estimated_output_size < max_output_file_size {
// total file size does not exceed the threshold, just return the original input.
return vec![input];
}
let mut splits = vec![];
let mut new_input = vec![];
let mut new_input_size = 0;
for f in input {
if new_input_size + f.size() > max_output_file_size {
splits.push(std::mem::take(&mut new_input));
new_input_size = 0;
}
new_input_size += f.size();
new_input.push(f);
}
if !new_input.is_empty() {
splits.push(new_input);
}
splits
})
.filter(|p| p.len() > 1)
.collect()
}

/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
/// the solution with minimum overhead according to files sizes to be merged.
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
Expand Down Expand Up @@ -305,10 +365,12 @@ fn find_latest_window_in_seconds<'a>(
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use super::*;
use crate::compaction::test_util::{new_file_handle, new_file_handles};
use crate::sst::file::Level;
use crate::sst::file::{FileMeta, Level};
use crate::test_util::NoopFilePurger;

#[test]
fn test_get_latest_window_in_seconds() {
Expand Down Expand Up @@ -525,8 +587,16 @@ mod tests {
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output = TwcsPicker::new(4, usize::MAX, 1, usize::MAX, None)
.build_output(&mut windows, active_window);
let output = TwcsPicker {
max_active_window_runs: 4,
max_active_window_files: usize::MAX,
max_inactive_window_runs: 1,
max_inactive_window_files: usize::MAX,
time_window_seconds: None,
max_output_file_size: None,
append_mode: false,
}
.build_output(&mut windows, active_window);

let output = output
.iter()
Expand Down Expand Up @@ -641,5 +711,43 @@ mod tests {
.check();
}

fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
inputs
.iter()
.map(|(start, end, size)| {
FileHandle::new(
FileMeta {
region_id: Default::default(),
file_id: Default::default(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: 0,
file_size: *size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
},
Arc::new(NoopFilePurger),
)
})
.collect()
}

#[test]
fn test_limit_output_size() {
let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
let runs = find_sorted_runs(&mut files);
assert_eq!(6, runs.len());
let files_to_merge = reduce_runs(runs, 2);

let enforced = enforce_max_output_size(files_to_merge, 2);
assert_eq!(2, enforced.len());
assert_eq!(2, enforced[0].len());
assert_eq!(2, enforced[1].len());
}

// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}
Loading