diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 16537f1ae8d0..7aedf7351585 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::hash_map::Entry; +use std::collections::{BTreeMap, HashMap}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -84,35 +85,41 @@ impl TwcsPicker { /// fragmentation. For other windows, we allow at most 1 file at each window. fn build_output( &self, - time_windows: &BTreeMap>, + time_windows: &BTreeMap, active_window: Option, ) -> Vec { let mut output = vec![]; for (window, files) in time_windows { + let files_in_window = &files.files; + // we only remove deletion markers once no file in current window overlaps with any other window. + let filter_deleted = !files.overlapping; + if let Some(active_window) = active_window && *window == active_window { - if files.len() > self.max_active_window_files { + if files_in_window.len() > self.max_active_window_files { output.push(CompactionOutput { output_file_id: FileId::random(), output_level: 1, // we only have two levels and always compact to l1 - inputs: files.clone(), + inputs: files_in_window.clone(), + filter_deleted, }); } else { debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window); } } else { // not active writing window - if files.len() > self.max_inactive_window_files { + if files_in_window.len() > self.max_inactive_window_files { output.push(CompactionOutput { output_file_id: FileId::random(), output_level: 1, - inputs: files.clone(), + inputs: files_in_window.clone(), + filter_deleted, }); } else { debug!( "No enough files, current: {}, max_inactive_window_files: {}", - files.len(), + files_in_window.len(), self.max_inactive_window_files ) } @@ -195,24 +202,99 @@ impl Picker for TwcsPicker { } } +struct Window { + start: Timestamp, + end: Timestamp, + files: Vec, + time_window: i64, + overlapping: bool, +} + +impl Window { + /// Creates a new [Window] with given file. + fn new_with_file(file: FileHandle) -> Self { + let (start, end) = file.time_range(); + Self { + start, + end, + files: vec![file], + time_window: 0, + overlapping: false, + } + } + + /// Returns the time range of all files in current window (inclusive). + fn range(&self) -> (Timestamp, Timestamp) { + (self.start, self.end) + } + + /// Adds a new file to window and updates time range. + fn add_file(&mut self, file: FileHandle) { + let (start, end) = file.time_range(); + self.start = self.start.min(start); + self.end = self.end.max(end); + self.files.push(file); + } +} + /// Assigns files to windows with predefined window size (in seconds) by their max timestamps. fn assign_to_windows<'a>( files: impl Iterator, time_window_size: i64, -) -> BTreeMap> { - let mut windows: BTreeMap> = BTreeMap::new(); +) -> BTreeMap { + let mut windows: HashMap = HashMap::new(); // Iterates all files and assign to time windows according to max timestamp - for file in files { - let (_, end) = file.time_range(); + for f in files { + let (_, end) = f.time_range(); let time_window = end .convert_to(TimeUnit::Second) .unwrap() .value() .align_to_ceil_by_bucket(time_window_size) .unwrap_or(i64::MIN); - windows.entry(time_window).or_default().push(file.clone()); + + match windows.entry(time_window) { + Entry::Occupied(mut e) => { + e.get_mut().add_file(f.clone()); + } + Entry::Vacant(e) => { + let mut window = Window::new_with_file(f.clone()); + window.time_window = time_window; + e.insert(window); + } + } + } + if windows.is_empty() { + return BTreeMap::new(); } - windows + + let mut windows = windows.into_values().collect::>(); + windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse())); + + let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty. + + for idx in 1..windows.len() { + let next_range = windows[idx].range(); + if overlaps(¤t_range, &next_range) { + windows[idx - 1].overlapping = true; + windows[idx].overlapping = true; + } + current_range = ( + current_range.0.min(next_range.0), + current_range.1.max(next_range.1), + ); + } + + windows.into_iter().map(|w| (w.time_window, w)).collect() +} + +/// Checks if two inclusive timestamp ranges overlap with each other. +fn overlaps(l: &(Timestamp, Timestamp), r: &(Timestamp, Timestamp)) -> bool { + let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) }; + let (_, l_end) = l; + let (r_start, _) = r; + + r_start <= l_end } /// Finds the latest active writing window among all files. @@ -344,6 +426,7 @@ impl TwcsCompactionTask { sst_layer.clone(), &output.inputs, append_mode, + output.filter_deleted, ) .await?; let file_meta_opt = sst_layer @@ -572,6 +655,8 @@ pub(crate) struct CompactionOutput { pub output_level: Level, /// Compaction input files. pub inputs: Vec, + /// Whether to remove deletion markers. + pub filter_deleted: bool, } /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. @@ -580,10 +665,12 @@ async fn build_sst_reader( sst_layer: AccessLayerRef, inputs: &[FileHandle], append_mode: bool, + filter_deleted: bool, ) -> error::Result { let scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?) .with_files(inputs.to_vec()) .with_append_mode(append_mode) + .with_filter_deleted(filter_deleted) // We ignore file not found error during compaction. .with_ignore_file_not_found(true); SeqScan::new(scan_input).build_reader().await @@ -642,7 +729,7 @@ mod tests { .iter(), 3, ); - assert_eq!(5, windows.get(&0).unwrap().len()); + assert_eq!(5, windows.get(&0).unwrap().files.len()); let files = [FileId::random(); 3]; let windows = assign_to_windows( @@ -656,15 +743,148 @@ mod tests { ); assert_eq!( files[0], - windows.get(&0).unwrap().first().unwrap().file_id() + windows.get(&0).unwrap().files.first().unwrap().file_id() ); assert_eq!( files[1], - windows.get(&3).unwrap().first().unwrap().file_id() + windows.get(&3).unwrap().files.first().unwrap().file_id() ); assert_eq!( files[2], - windows.get(&12).unwrap().first().unwrap().file_id() + windows.get(&12).unwrap().files.first().unwrap().file_id() + ); + } + + /// (Window value, overlapping, files' time ranges in window) + type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>); + + fn check_assign_to_windows_with_overlapping( + file_time_ranges: &[(i64, i64)], + time_window: i64, + expected_files: &[ExpectedWindowSpec], + ) { + let files: Vec<_> = (0..file_time_ranges.len()) + .map(|_| FileId::random()) + .collect(); + + let file_handles = files + .iter() + .zip(file_time_ranges.iter()) + .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0)) + .collect::>(); + + let windows = assign_to_windows(file_handles.iter(), time_window); + + for (expected_window, overlapping, window_files) in expected_files { + let actual_window = windows.get(expected_window).unwrap(); + assert_eq!(*overlapping, actual_window.overlapping); + let mut file_ranges = actual_window + .files + .iter() + .map(|f| { + let (s, e) = f.time_range(); + (s.value(), e.value()) + }) + .collect::>(); + file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1))); + assert_eq!(window_files, &file_ranges); + } + } + + #[test] + fn test_assign_to_windows_with_overlapping() { + check_assign_to_windows_with_overlapping( + &[(0, 999), (1000, 1999), (2000, 2999)], + 2, + &[ + (0, false, vec![(0, 999)]), + (2, false, vec![(1000, 1999), (2000, 2999)]), + ], + ); + + check_assign_to_windows_with_overlapping( + &[(0, 1), (0, 999), (100, 2999)], + 2, + &[ + (0, true, vec![(0, 1), (0, 999)]), + (2, true, vec![(100, 2999)]), + ], + ); + + check_assign_to_windows_with_overlapping( + &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)], + 2, + &[ + (0, false, vec![(0, 999)]), + (2, false, vec![(1000, 1999), (2000, 2999)]), + (4, false, vec![(3000, 3999)]), + ], + ); + + check_assign_to_windows_with_overlapping( + &[ + (0, 999), + (1000, 1999), + (2000, 2999), + (3000, 3999), + (0, 3999), + ], + 2, + &[ + (0, true, vec![(0, 999)]), + (2, true, vec![(1000, 1999), (2000, 2999)]), + (4, true, vec![(0, 3999), (3000, 3999)]), + ], + ); + + check_assign_to_windows_with_overlapping( + &[ + (0, 999), + (1000, 1999), + (2000, 2999), + (3000, 3999), + (1999, 3999), + ], + 2, + &[ + (0, false, vec![(0, 999)]), + (2, true, vec![(1000, 1999), (2000, 2999)]), + (4, true, vec![(1999, 3999), (3000, 3999)]), + ], + ); + + check_assign_to_windows_with_overlapping( + &[ + (0, 999), // window 0 + (1000, 1999), // window 2 + (2000, 2999), // window 2 + (3000, 3999), // window 4 + (2999, 3999), // window 4 + ], + 2, + &[ + // window 2 overlaps with window 4 + (0, false, vec![(0, 999)]), + (2, true, vec![(1000, 1999), (2000, 2999)]), + (4, true, vec![(2999, 3999), (3000, 3999)]), + ], + ); + + check_assign_to_windows_with_overlapping( + &[ + (0, 999), // window 0 + (1000, 1999), // window 2 + (2000, 2999), // window 2 + (3000, 3999), // window 4 + (0, 1000), // // window 2 + ], + 2, + &[ + // only window 0 overlaps with window 2. + (0, true, vec![(0, 999)]), + (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]), + (4, false, vec![(3000, 3999)]), + ], ); } diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index a5277bf2a496..8e6ac03b83d1 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -149,6 +149,102 @@ async fn test_compaction_region() { assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); } +#[tokio::test] +async fn test_compaction_region_with_overlapping() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_files", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("compaction.twcs.time_window", "1h") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 4 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 0..2400).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 3600..10800).await; // window 10800 + delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600 + + let result = engine + .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!( + 2, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + let stream = scanner.scan().await.unwrap(); + + let vec = collect_stream_ts(stream).await; + assert_eq!((3600..10800).map(|i| { i * 1000 }).collect::>(), vec); +} + +#[tokio::test] +async fn test_compaction_region_with_overlapping_delete_all() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_files", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("compaction.twcs.time_window", "1h") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 4 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 0..2400).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600 + delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800 + + let result = engine + .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!( + 4, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + let stream = scanner.scan().await.unwrap(); + + let vec = collect_stream_ts(stream).await; + assert!(vec.is_empty()); +} + // For issue https://github.com/GreptimeTeam/greptimedb/issues/3633 #[tokio::test] async fn test_readonly_during_compaction() { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 59debe15ac45..27a746f3ba6a 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -396,6 +396,7 @@ pub struct CreateRequestBuilder { primary_key: Option>, all_not_null: bool, engine: String, + ts_type: ConcreteDataType, } impl Default for CreateRequestBuilder { @@ -408,6 +409,7 @@ impl Default for CreateRequestBuilder { primary_key: None, all_not_null: false, engine: MITO_ENGINE_NAME.to_string(), + ts_type: ConcreteDataType::timestamp_millisecond_datatype(), } } } @@ -454,6 +456,12 @@ impl CreateRequestBuilder { self } + #[must_use] + pub fn with_ts_type(mut self, ty: ConcreteDataType) -> Self { + self.ts_type = ty; + self + } + pub fn build(&self) -> RegionCreateRequest { let mut column_id = 0; let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1); @@ -487,7 +495,7 @@ impl CreateRequestBuilder { column_metadatas.push(ColumnMetadata { column_schema: ColumnSchema::new( "ts", - ConcreteDataType::timestamp_millisecond_datatype(), + self.ts_type.clone(), // Time index is always not null. false, ),