Skip to content

Commit

Permalink
chore: more assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Dec 5, 2024
1 parent a58271b commit 50d6fa6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
4 changes: 3 additions & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}

SeqScan::new(scan_input, true).build_reader().await
SeqScan::new(scan_input, true)
.build_reader_for_compaction()
.await
}
}

Expand Down
41 changes: 31 additions & 10 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ impl SeqScan {
}

/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
assert!(self.compaction);

let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
0,
Expand All @@ -100,21 +105,16 @@ impl SeqScan {
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];

let reader = Self::build_all_merge_reader(
&self.stream_ctx,
partition_ranges,
self.compaction,
&part_metrics,
)
.await?;
let reader =
Self::build_all_merge_reader(&self.stream_ctx, partition_ranges, &part_metrics).await?;
Ok(Box::new(reader))
}

/// Builds a merge reader that reads all data.
/// Callers MUST not split ranges before calling this method.
async fn build_all_merge_reader(
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
compaction: bool,
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
let mut sources = Vec::new();
Expand All @@ -126,12 +126,19 @@ impl SeqScan {
build_sources(
stream_ctx,
part_range,
compaction,
true,
part_metrics,
range_builder_list.clone(),
&mut sources,
);
}

common_telemetry::debug!(
"Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
stream_ctx.input.mapper.metadata().region_id,
partition_ranges.len(),
sources.len()
);
Self::build_reader_from_sources(stream_ctx, sources, None).await
}

Expand Down Expand Up @@ -370,6 +377,20 @@ fn build_sources(
) {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
if compaction {
// Compaction expects input sources are not been splitted.

Check warning on line 382 in src/mito2/src/read/seq_scan.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"splitted" should be "split".

Check warning on line 382 in src/mito2/src/read/seq_scan.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"splitted" should be "split".
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
// It should scan all row groups.
debug_assert_eq!(
-1, row_group_idx.row_group_index,
"Expect {} range scan all row groups, given: {}",
i, row_group_idx.row_group_index,
);
}
}

sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
Expand Down

0 comments on commit 50d6fa6

Please sign in to comment.