Skip to content

Commit

Permalink
feat: yields empty batch after reading a range (#4845)
Browse files Browse the repository at this point in the history
* feat: add empty batch to end of range stream

* feat: add batch validation

* fix: validate batch order

* fix: not yield empty batch in compaction

* fix: empty record batch

* feat: add a flag to enable empty batch
  • Loading branch information
evenyag authored Oct 21, 2024
1 parent a50eea7 commit 5d28f7a
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/system_schema/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl MemoryTableBuilder {
/// Construct the `information_schema.{table_name}` virtual table
pub async fn memory_records(&mut self) -> Result<RecordBatch> {
if self.columns.is_empty() {
RecordBatch::new_empty(self.schema.clone()).context(CreateRecordBatchSnafu)
Ok(RecordBatch::new_empty(self.schema.clone()))
} else {
RecordBatch::new(self.schema.clone(), std::mem::take(&mut self.columns))
.context(CreateRecordBatchSnafu)
Expand Down
14 changes: 10 additions & 4 deletions src/common/recordbatch/src/recordbatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::slice;
use std::sync::Arc;

use datafusion::arrow::util::pretty::pretty_format_batches;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
use datatypes::vectors::{Helper, VectorRef};
Expand Down Expand Up @@ -58,13 +59,18 @@ impl RecordBatch {
}

/// Create an empty [`RecordBatch`] from `schema`.
pub fn new_empty(schema: SchemaRef) -> Result<RecordBatch> {
pub fn new_empty(schema: SchemaRef) -> RecordBatch {
let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
Ok(RecordBatch {
let columns = schema
.column_schemas()
.iter()
.map(|col| col.data_type.create_mutable_vector(0).to_vector())
.collect();
RecordBatch {
schema,
columns: vec![],
columns,
df_record_batch,
})
}
}

pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
Expand Down
110 changes: 110 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,116 @@ impl Batch {
// Safety: sequences is not null so it actually returns Some.
self.sequences.get_data(index).unwrap()
}

/// Checks the batch is monotonic by timestamps.
#[cfg(debug_assertions)]
pub(crate) fn check_monotonic(&self) -> bool {
if self.timestamps_native().is_none() {
return true;
}

let timestamps = self.timestamps_native().unwrap();
let sequences = self.sequences.as_arrow().values();
timestamps.windows(2).enumerate().all(|(i, window)| {
let current = window[0];
let next = window[1];
let current_sequence = sequences[i];
let next_sequence = sequences[i + 1];
if current == next {
current_sequence >= next_sequence
} else {
current < next
}
})
}

/// Returns true if the given batch is behind the current batch.
#[cfg(debug_assertions)]
pub(crate) fn check_next_batch(&self, other: &Batch) -> bool {
// Checks the primary key and then the timestamp.
use std::cmp::Ordering;
self.primary_key()
.cmp(other.primary_key())
.then_with(|| self.last_timestamp().cmp(&other.first_timestamp()))
.then_with(|| other.first_sequence().cmp(&self.last_sequence()))
<= Ordering::Equal
}
}

/// A struct to check the batch is monotonic.
#[cfg(debug_assertions)]
#[derive(Default)]
pub(crate) struct BatchChecker {
last_batch: Option<Batch>,
}

#[cfg(debug_assertions)]
impl BatchChecker {
/// Returns true if the given batch is monotonic and behind
/// the last batch.
pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> bool {
if !batch.check_monotonic() {
return false;
}

// Checks the batch is behind the last batch.
// Then Updates the last batch.
let is_behind = self
.last_batch
.as_ref()
.map(|last| last.check_next_batch(batch))
.unwrap_or(true);
self.last_batch = Some(batch.clone());
is_behind
}

/// Formats current batch and last batch for debug.
pub(crate) fn format_batch(&self, batch: &Batch) -> String {
use std::fmt::Write;

let mut message = String::new();
if let Some(last) = &self.last_batch {
write!(
message,
"last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
last.primary_key(),
last.last_timestamp(),
last.last_sequence()
)
.unwrap();
}
write!(
message,
"batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
batch.primary_key(),
batch.timestamps(),
batch.sequences()
)
.unwrap();

message
}

/// Checks batches from the part range are monotonic. Otherwise, panics.
pub(crate) fn ensure_part_range_batch(
&mut self,
scanner: &str,
region_id: store_api::storage::RegionId,
partition: usize,
part_range: store_api::region_engine::PartitionRange,
batch: &Batch,
) {
if !self.check_monotonic(batch) {
panic!(
"{}: batch is not sorted, region_id: {}, partition: {}, part_range: {:?}, {}",
scanner,
region_id,
partition,
part_range,
self.format_batch(batch),
);
}
}
}

/// Len of timestamp in arrow row format.
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ impl ProjectionMapper {
self.output_schema.clone()
}

/// Returns an empty [RecordBatch].
pub(crate) fn empty_record_batch(&self) -> RecordBatch {
RecordBatch::new_empty(self.output_schema.clone())
}

/// Converts a [Batch] to a [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
Expand Down
26 changes: 26 additions & 0 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl SeqScan {
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range();
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
Expand Down Expand Up @@ -230,6 +231,8 @@ impl SeqScan {
let cache = stream_ctx.input.cache_manager.as_deref();
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default();
while let Some(batch) = reader
.next_batch()
.await
Expand All @@ -240,6 +243,20 @@ impl SeqScan {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();

debug_assert!(!batch.is_empty());
if batch.is_empty() {
continue;
}

#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"SeqScan",
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
&batch,
);

let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
Expand All @@ -249,6 +266,15 @@ impl SeqScan {

fetch_start = Instant::now();
}

// Yields an empty part to indicate this range is terminated.
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield stream_ctx.input.mapper.empty_record_batch();
metrics.yield_cost += yield_start.elapsed();
}

metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
Expand Down
25 changes: 25 additions & 0 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl UnorderedScan {
);
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range();

let stream = try_stream! {
part_metrics.on_first_poll();
Expand All @@ -138,6 +139,8 @@ impl UnorderedScan {
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default();

let stream = Self::scan_partition_range(
stream_ctx.clone(),
Expand All @@ -150,6 +153,20 @@ impl UnorderedScan {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();

debug_assert!(!batch.is_empty());
if batch.is_empty() {
continue;
}

#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"UnorderedScan",
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
&batch,
);

let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
Expand All @@ -160,6 +177,14 @@ impl UnorderedScan {
fetch_start = Instant::now();
}

// Yields an empty part to indicate this range is terminated.
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield stream_ctx.input.mapper.empty_record_batch();
metrics.yield_cost += yield_start.elapsed();
}

metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
Expand Down
14 changes: 14 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ pub struct ScannerProperties {
/// Total rows that **may** return by scanner. This field is only read iff
/// [ScannerProperties::append_mode] is true.
total_rows: usize,

/// Whether to yield an empty batch to distinguish partition ranges.
distinguish_partition_range: bool,
}

impl ScannerProperties {
Expand All @@ -224,12 +227,19 @@ impl ScannerProperties {
self
}

/// Sets distinguish partition range for scanner.
pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
self.distinguish_partition_range = distinguish_partition_range;
self
}

/// Creates a new [`ScannerProperties`] with the given partitioning.
pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
Self {
partitions,
append_mode,
total_rows,
distinguish_partition_range: false,
}
}

Expand All @@ -244,6 +254,10 @@ impl ScannerProperties {
pub fn total_rows(&self) -> usize {
self.total_rows
}

pub fn distinguish_partition_range(&self) -> bool {
self.distinguish_partition_range
}
}

/// A scanner that provides a way to scan the region concurrently.
Expand Down

0 comments on commit 5d28f7a

Please sign in to comment.