From 5d28f7a91230cfc44f14d3bac6c291187e76b85e Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 21 Oct 2024 21:52:47 +0800 Subject: [PATCH] feat: yields empty batch after reading a range (#4845) * feat: add empty batch to end of range stream * feat: add batch validation * fix: validate batch order * fix: not yield empty batch in compaction * fix: empty record batch * feat: add a flag to enable empty batch --- src/catalog/src/system_schema/memory_table.rs | 2 +- src/common/recordbatch/src/recordbatch.rs | 14 ++- src/mito2/src/read.rs | 110 ++++++++++++++++++ src/mito2/src/read/projection.rs | 5 + src/mito2/src/read/seq_scan.rs | 26 +++++ src/mito2/src/read/unordered_scan.rs | 25 ++++ src/store-api/src/region_engine.rs | 14 +++ 7 files changed, 191 insertions(+), 5 deletions(-) diff --git a/src/catalog/src/system_schema/memory_table.rs b/src/catalog/src/system_schema/memory_table.rs index dd026010bb44..f5c675832e48 100644 --- a/src/catalog/src/system_schema/memory_table.rs +++ b/src/catalog/src/system_schema/memory_table.rs @@ -74,7 +74,7 @@ impl MemoryTableBuilder { /// Construct the `information_schema.{table_name}` virtual table pub async fn memory_records(&mut self) -> Result { if self.columns.is_empty() { - RecordBatch::new_empty(self.schema.clone()).context(CreateRecordBatchSnafu) + Ok(RecordBatch::new_empty(self.schema.clone())) } else { RecordBatch::new(self.schema.clone(), std::mem::take(&mut self.columns)) .context(CreateRecordBatchSnafu) diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 420901902b64..1459b87fac6f 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -17,6 +17,7 @@ use std::slice; use std::sync::Arc; use datafusion::arrow::util::pretty::pretty_format_batches; +use datatypes::prelude::DataType; use datatypes::schema::SchemaRef; use datatypes::value::Value; use datatypes::vectors::{Helper, VectorRef}; @@ -58,13 +59,18 @@ impl RecordBatch { } /// Create an empty [`RecordBatch`] from `schema`. - pub fn new_empty(schema: SchemaRef) -> Result { + pub fn new_empty(schema: SchemaRef) -> RecordBatch { let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone()); - Ok(RecordBatch { + let columns = schema + .column_schemas() + .iter() + .map(|col| col.data_type.create_mutable_vector(0).to_vector()) + .collect(); + RecordBatch { schema, - columns: vec![], + columns, df_record_batch, - }) + } } pub fn try_project(&self, indices: &[usize]) -> Result { diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index fbef5bb8ed75..f7ed4ca958d3 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -491,6 +491,116 @@ impl Batch { // Safety: sequences is not null so it actually returns Some. self.sequences.get_data(index).unwrap() } + + /// Checks the batch is monotonic by timestamps. + #[cfg(debug_assertions)] + pub(crate) fn check_monotonic(&self) -> bool { + if self.timestamps_native().is_none() { + return true; + } + + let timestamps = self.timestamps_native().unwrap(); + let sequences = self.sequences.as_arrow().values(); + timestamps.windows(2).enumerate().all(|(i, window)| { + let current = window[0]; + let next = window[1]; + let current_sequence = sequences[i]; + let next_sequence = sequences[i + 1]; + if current == next { + current_sequence >= next_sequence + } else { + current < next + } + }) + } + + /// Returns true if the given batch is behind the current batch. + #[cfg(debug_assertions)] + pub(crate) fn check_next_batch(&self, other: &Batch) -> bool { + // Checks the primary key and then the timestamp. + use std::cmp::Ordering; + self.primary_key() + .cmp(other.primary_key()) + .then_with(|| self.last_timestamp().cmp(&other.first_timestamp())) + .then_with(|| other.first_sequence().cmp(&self.last_sequence())) + <= Ordering::Equal + } +} + +/// A struct to check the batch is monotonic. +#[cfg(debug_assertions)] +#[derive(Default)] +pub(crate) struct BatchChecker { + last_batch: Option, +} + +#[cfg(debug_assertions)] +impl BatchChecker { + /// Returns true if the given batch is monotonic and behind + /// the last batch. + pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> bool { + if !batch.check_monotonic() { + return false; + } + + // Checks the batch is behind the last batch. + // Then Updates the last batch. + let is_behind = self + .last_batch + .as_ref() + .map(|last| last.check_next_batch(batch)) + .unwrap_or(true); + self.last_batch = Some(batch.clone()); + is_behind + } + + /// Formats current batch and last batch for debug. + pub(crate) fn format_batch(&self, batch: &Batch) -> String { + use std::fmt::Write; + + let mut message = String::new(); + if let Some(last) = &self.last_batch { + write!( + message, + "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ", + last.primary_key(), + last.last_timestamp(), + last.last_sequence() + ) + .unwrap(); + } + write!( + message, + "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}", + batch.primary_key(), + batch.timestamps(), + batch.sequences() + ) + .unwrap(); + + message + } + + /// Checks batches from the part range are monotonic. Otherwise, panics. + pub(crate) fn ensure_part_range_batch( + &mut self, + scanner: &str, + region_id: store_api::storage::RegionId, + partition: usize, + part_range: store_api::region_engine::PartitionRange, + batch: &Batch, + ) { + if !self.check_monotonic(batch) { + panic!( + "{}: batch is not sorted, region_id: {}, partition: {}, part_range: {:?}, {}", + scanner, + region_id, + partition, + part_range, + self.format_batch(batch), + ); + } + } } /// Len of timestamp in arrow row format. diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 375248d4186c..9ba5f6eccf1e 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -160,6 +160,11 @@ impl ProjectionMapper { self.output_schema.clone() } + /// Returns an empty [RecordBatch]. + pub(crate) fn empty_record_batch(&self) -> RecordBatch { + RecordBatch::new_empty(self.output_schema.clone()) + } + /// Converts a [Batch] to a [RecordBatch]. /// /// The batch must match the `projection` using to build the mapper. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index fe4054632e97..8d867d7d9222 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -203,6 +203,7 @@ impl SeqScan { let semaphore = self.semaphore.clone(); let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; + let distinguish_range = self.properties.distinguish_partition_range(); let part_metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, partition, @@ -230,6 +231,8 @@ impl SeqScan { let cache = stream_ctx.input.cache_manager.as_deref(); let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); + #[cfg(debug_assertions)] + let mut checker = crate::read::BatchChecker::default(); while let Some(batch) = reader .next_batch() .await @@ -240,6 +243,20 @@ impl SeqScan { metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); + debug_assert!(!batch.is_empty()); + if batch.is_empty() { + continue; + } + + #[cfg(debug_assertions)] + checker.ensure_part_range_batch( + "SeqScan", + stream_ctx.input.mapper.metadata().region_id, + partition, + part_range, + &batch, + ); + let convert_start = Instant::now(); let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; metrics.convert_cost += convert_start.elapsed(); @@ -249,6 +266,15 @@ impl SeqScan { fetch_start = Instant::now(); } + + // Yields an empty part to indicate this range is terminated. + // The query engine can use this to optimize some queries. + if distinguish_range { + let yield_start = Instant::now(); + yield stream_ctx.input.mapper.empty_record_batch(); + metrics.yield_cost += yield_start.elapsed(); + } + metrics.scan_cost += fetch_start.elapsed(); part_metrics.merge_metrics(&metrics); } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 992cba9d5c8c..c51f349cbd03 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -129,6 +129,7 @@ impl UnorderedScan { ); let stream_ctx = self.stream_ctx.clone(); let part_ranges = self.properties.partitions[partition].clone(); + let distinguish_range = self.properties.distinguish_partition_range(); let stream = try_stream! { part_metrics.on_first_poll(); @@ -138,6 +139,8 @@ impl UnorderedScan { for part_range in part_ranges { let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); + #[cfg(debug_assertions)] + let mut checker = crate::read::BatchChecker::default(); let stream = Self::scan_partition_range( stream_ctx.clone(), @@ -150,6 +153,20 @@ impl UnorderedScan { metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); + debug_assert!(!batch.is_empty()); + if batch.is_empty() { + continue; + } + + #[cfg(debug_assertions)] + checker.ensure_part_range_batch( + "UnorderedScan", + stream_ctx.input.mapper.metadata().region_id, + partition, + part_range, + &batch, + ); + let convert_start = Instant::now(); let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; metrics.convert_cost += convert_start.elapsed(); @@ -160,6 +177,14 @@ impl UnorderedScan { fetch_start = Instant::now(); } + // Yields an empty part to indicate this range is terminated. + // The query engine can use this to optimize some queries. + if distinguish_range { + let yield_start = Instant::now(); + yield stream_ctx.input.mapper.empty_record_batch(); + metrics.yield_cost += yield_start.elapsed(); + } + metrics.scan_cost += fetch_start.elapsed(); part_metrics.merge_metrics(&metrics); } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 04bc6df6bcee..86e96a161b2e 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -203,6 +203,9 @@ pub struct ScannerProperties { /// Total rows that **may** return by scanner. This field is only read iff /// [ScannerProperties::append_mode] is true. total_rows: usize, + + /// Whether to yield an empty batch to distinguish partition ranges. + distinguish_partition_range: bool, } impl ScannerProperties { @@ -224,12 +227,19 @@ impl ScannerProperties { self } + /// Sets distinguish partition range for scanner. + pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self { + self.distinguish_partition_range = distinguish_partition_range; + self + } + /// Creates a new [`ScannerProperties`] with the given partitioning. pub fn new(partitions: Vec>, append_mode: bool, total_rows: usize) -> Self { Self { partitions, append_mode, total_rows, + distinguish_partition_range: false, } } @@ -244,6 +254,10 @@ impl ScannerProperties { pub fn total_rows(&self) -> usize { self.total_rows } + + pub fn distinguish_partition_range(&self) -> bool { + self.distinguish_partition_range + } } /// A scanner that provides a way to scan the region concurrently.