From 50d6fa6eb65ebd9e70de02094ad51e5bcc96c305 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 5 Dec 2024 17:44:01 +0800 Subject: [PATCH] chore: more assertions --- src/mito2/src/compaction.rs | 4 +++- src/mito2/src/read/seq_scan.rs | 41 +++++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 873b1dced1e6..f4d4f267cd1f 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -589,7 +589,9 @@ impl<'a> CompactionSstReaderBuilder<'a> { scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?); } - SeqScan::new(scan_input, true).build_reader().await + SeqScan::new(scan_input, true) + .build_reader_for_compaction() + .await } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index dfa810857481..c47ba5542720 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -86,7 +86,12 @@ impl SeqScan { } /// Builds a [BoxedBatchReader] from sequential scan for compaction. - pub async fn build_reader(&self) -> Result { + /// + /// # Panics + /// Panics if the compaction flag is not set. + pub async fn build_reader_for_compaction(&self) -> Result { + assert!(self.compaction); + let part_metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, 0, @@ -100,21 +105,16 @@ impl SeqScan { debug_assert_eq!(1, self.properties.partitions.len()); let partition_ranges = &self.properties.partitions[0]; - let reader = Self::build_all_merge_reader( - &self.stream_ctx, - partition_ranges, - self.compaction, - &part_metrics, - ) - .await?; + let reader = + Self::build_all_merge_reader(&self.stream_ctx, partition_ranges, &part_metrics).await?; Ok(Box::new(reader)) } /// Builds a merge reader that reads all data. + /// Callers MUST not split ranges before calling this method. async fn build_all_merge_reader( stream_ctx: &Arc, partition_ranges: &[PartitionRange], - compaction: bool, part_metrics: &PartitionMetrics, ) -> Result { let mut sources = Vec::new(); @@ -126,12 +126,19 @@ impl SeqScan { build_sources( stream_ctx, part_range, - compaction, + true, part_metrics, range_builder_list.clone(), &mut sources, ); } + + common_telemetry::debug!( + "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}", + stream_ctx.input.mapper.metadata().region_id, + partition_ranges.len(), + sources.len() + ); Self::build_reader_from_sources(stream_ctx, sources, None).await } @@ -370,6 +377,20 @@ fn build_sources( ) { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range.identifier]; + #[cfg(debug_assertions)] + if compaction { + // Compaction expects input sources are not been splitted. + debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len()); + for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() { + // It should scan all row groups. + debug_assert_eq!( + -1, row_group_idx.row_group_index, + "Expect {} range scan all row groups, given: {}", + i, row_group_idx.row_group_index, + ); + } + } + sources.reserve(range_meta.row_group_indices.len()); for index in &range_meta.row_group_indices { let stream = if stream_ctx.is_mem_range_index(*index) {