diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 437e8e3da30d..7e82bc51a9ba 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -35,6 +35,8 @@ mod open_test; #[cfg(test)] mod projection_test; #[cfg(test)] +mod prune_test; +#[cfg(test)] mod truncate_test; use std::sync::Arc; diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 9a88ecbc4989..de35ead2772d 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -44,7 +44,12 @@ async fn put_and_flush( put_rows(engine, region_id, rows).await; let Output::AffectedRows(rows) = engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) .await .unwrap() else { @@ -79,7 +84,12 @@ async fn delete_and_flush( assert_eq!(row_cnt, rows_affected); let Output::AffectedRows(rows) = engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) .await .unwrap() else { diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 35c86c184371..0ffbddca65a7 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -67,7 +67,7 @@ async fn test_engine_drop_region() { rows: build_rows_for_key("a", 0, 2, 0), }; put_rows(&engine, region_id, rows).await; - flush_region(&engine, region_id).await; + flush_region(&engine, region_id, None).await; // drop the created region. engine diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 84eb8b0eec44..63876149e8d2 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -49,7 +49,7 @@ async fn test_manual_flush() { }; put_rows(&engine, region_id, rows).await; - flush_region(&engine, region_id).await; + flush_region(&engine, region_id, None).await; let request = ScanRequest::default(); let scanner = engine.scanner(region_id, request).unwrap(); @@ -164,7 +164,7 @@ async fn test_write_stall() { tokio::spawn(async move { listener.wait().await; - flush_region(&engine_cloned, region_id).await; + flush_region(&engine_cloned, region_id, None).await; }); // Triggers write stall. @@ -212,7 +212,7 @@ async fn test_flush_empty() { .await .unwrap(); - flush_region(&engine, region_id).await; + flush_region(&engine, region_id, None).await; let request = ScanRequest::default(); let scanner = engine.scanner(region_id, request).unwrap(); @@ -247,7 +247,7 @@ async fn test_flush_reopen_region() { }; put_rows(&engine, region_id, rows).await; - flush_region(&engine, region_id).await; + flush_region(&engine, region_id, None).await; let check_region = || { let region = engine.get_region(region_id).unwrap(); let version_data = region.version_control.current(); diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs new file mode 100644 index 000000000000..503839c66ce7 --- /dev/null +++ b/src/mito2/src/engine/prune_test.rs @@ -0,0 +1,102 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::Rows; +use common_query::logical_plan::DfExpr; +use common_query::prelude::Expr; +use common_recordbatch::RecordBatches; +use datafusion_common::ScalarValue; +use datafusion_expr::lit; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::{ + build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; + +async fn check_prune_row_groups(expr: DfExpr, expected: &str) { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 10), + }, + ) + .await; + flush_region(&engine, region_id, Some(5)).await; + + let stream = engine + .handle_query( + region_id, + ScanRequest { + filters: vec![Expr::from(expr)], + ..Default::default() + }, + ) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_read_parquet_stats() { + common_telemetry::init_default_ut_logging(); + + check_prune_row_groups( + datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | +| 9 | 9.0 | 1970-01-01T00:00:09 | ++-------+---------+---------------------+", + ) + .await; + + check_prune_row_groups( + datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | +| 9 | 9.0 | 1970-01-01T00:00:09 | ++-------+---------+---------------------+", + ) + .await; +} diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 03386369bd2d..5903eb42c0b7 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -167,7 +167,12 @@ async fn test_engine_truncate_after_flush() { // Flush the region. engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) .await .unwrap(); @@ -304,7 +309,12 @@ async fn test_engine_truncate_during_flush() { let flush_task = tokio::spawn(async move { info!("do flush task!!!!"); engine_cloned - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) .await }); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ff7ee5051e31..655a7389d7b0 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -190,6 +190,7 @@ pub(crate) struct RegionFlushTask { pub(crate) memtable_builder: MemtableBuilderRef, pub(crate) file_purger: FilePurgerRef, pub(crate) listener: WorkerListener, + pub(crate) row_group_size: Option, } impl RegionFlushTask { @@ -272,7 +273,10 @@ impl RegionFlushTask { /// Flushes memtables to level 0 SSTs. async fn flush_memtables(&self, version: &VersionRef) -> Result> { // TODO(yingwen): Make it configurable. - let write_opts = WriteOptions::default(); + let mut write_opts = WriteOptions::default(); + if let Some(row_group_size) = self.row_group_size { + write_opts.row_group_size = row_group_size; + } let memtables = version.memtables.immutables(); let mut file_metas = Vec::with_capacity(memtables.len()); @@ -689,6 +693,7 @@ mod tests { memtable_builder: builder.memtable_builder(), file_purger: builder.file_purger(), listener: WorkerListener::default(), + row_group_size: None, }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 083238dcd06d..403c38b7fa81 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -165,8 +165,9 @@ impl ScanRegion { .collect(); debug!( - "Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}", + "Seq scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, total_ssts: {}", self.version.metadata.region_id, + self.request, memtables.len(), files.len(), total_ssts diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 1f461735ee7e..0cced7e2575c 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -16,6 +16,7 @@ mod format; pub mod reader; +mod stats; pub mod writer; use common_base::readable_size::ReadableSize; diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index b8af414b1c97..416a2f5d719a 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -30,14 +30,18 @@ use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; -use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array}; +use datafusion_common::ScalarValue; +use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array}; use datatypes::arrow::datatypes::{ - DataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type, + DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type, }; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::prelude::DataType; use datatypes::vectors::{Helper, Vector}; +use parquet::file::metadata::RowGroupMetaData; +use parquet::file::statistics::Statistics; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::storage::consts::{ OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, }; @@ -47,6 +51,7 @@ use crate::error::{ ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, }; use crate::read::{Batch, BatchBuilder, BatchColumn}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Number of columns that have fixed positions. /// @@ -250,6 +255,66 @@ impl ReadFormat { Ok(()) } + /// Returns min values of specific column in row groups. + pub(crate) fn min_values( + &self, + row_groups: &[RowGroupMetaData], + column_id: ColumnId, + ) -> Option { + let column = self.metadata.column_by_id(column_id)?; + match column.semantic_type { + SemanticType::Tag => self.tag_values(row_groups, column, true), + SemanticType::Field => { + let index = self.field_id_to_index.get(&column_id)?; + Self::column_values(row_groups, column, *index, true) + } + SemanticType::Timestamp => { + let index = self.time_index_position(); + Self::column_values(row_groups, column, index, true) + } + } + } + + /// Returns max values of specific column in row groups. + pub(crate) fn max_values( + &self, + row_groups: &[RowGroupMetaData], + column_id: ColumnId, + ) -> Option { + let column = self.metadata.column_by_id(column_id)?; + match column.semantic_type { + SemanticType::Tag => self.tag_values(row_groups, column, false), + SemanticType::Field => { + let index = self.field_id_to_index.get(&column_id)?; + Self::column_values(row_groups, column, *index, false) + } + SemanticType::Timestamp => { + let index = self.time_index_position(); + Self::column_values(row_groups, column, index, false) + } + } + } + + /// Returns null counts of specific column in row groups. + pub(crate) fn null_counts( + &self, + row_groups: &[RowGroupMetaData], + column_id: ColumnId, + ) -> Option { + let column = self.metadata.column_by_id(column_id)?; + match column.semantic_type { + SemanticType::Tag => None, + SemanticType::Field => { + let index = self.field_id_to_index.get(&column_id)?; + Self::column_null_counts(row_groups, *index) + } + SemanticType::Timestamp => { + let index = self.time_index_position(); + Self::column_null_counts(row_groups, index) + } + } + } + /// Get fields from `record_batch`. fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result> { record_batch @@ -273,6 +338,148 @@ impl ReadFormat { }) .collect() } + + /// Returns min/max values of specific tag. + fn tag_values( + &self, + row_groups: &[RowGroupMetaData], + column: &ColumnMetadata, + is_min: bool, + ) -> Option { + let is_first_tag = self + .metadata + .primary_key + .first() + .map(|id| *id == column.column_id) + .unwrap_or(false); + if !is_first_tag { + // Only the min-max of the first tag is available in the primary key. + return None; + } + + let converter = + McmpRowCodec::new(vec![SortField::new(column.column_schema.data_type.clone())]); + let values = row_groups.iter().map(|meta| { + let stats = meta.column(self.primary_key_position()).statistics()?; + if !stats.has_min_max_set() { + return None; + } + match stats { + Statistics::Boolean(_) => None, + Statistics::Int32(_) => None, + Statistics::Int64(_) => None, + Statistics::Int96(_) => None, + Statistics::Float(_) => None, + Statistics::Double(_) => None, + Statistics::ByteArray(s) => { + let bytes = if is_min { s.min_bytes() } else { s.max_bytes() }; + let mut values = converter.decode(bytes).ok()?; + values.pop() + } + Statistics::FixedLenByteArray(_) => None, + } + }); + let mut builder = column + .column_schema + .data_type + .create_mutable_vector(row_groups.len()); + for value_opt in values { + match value_opt { + // Safety: We use the same data type to create the converter. + Some(v) => builder.push_value_ref(v.as_value_ref()), + None => builder.push_null(), + } + } + let vector = builder.to_vector(); + + Some(vector.to_arrow_array()) + } + + /// Returns min/max values of specific non-tag columns. + fn column_values( + row_groups: &[RowGroupMetaData], + column: &ColumnMetadata, + column_index: usize, + is_min: bool, + ) -> Option { + let null_scalar: ScalarValue = column + .column_schema + .data_type + .as_arrow_type() + .try_into() + .ok()?; + let scalar_values = row_groups + .iter() + .map(|meta| { + let stats = meta.column(column_index).statistics()?; + if !stats.has_min_max_set() { + return None; + } + match stats { + Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min { + *s.min() + } else { + *s.max() + }))), + Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min { + *s.min() + } else { + *s.max() + }))), + Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min { + *s.min() + } else { + *s.max() + }))), + + Statistics::Int96(_) => None, + Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min { + *s.min() + } else { + *s.max() + }))), + Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min { + *s.min() + } else { + *s.max() + }))), + Statistics::ByteArray(s) => { + let bytes = if is_min { s.min_bytes() } else { s.max_bytes() }; + let s = String::from_utf8(bytes.to_vec()).ok(); + Some(ScalarValue::Utf8(s)) + } + + Statistics::FixedLenByteArray(_) => None, + } + }) + .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone())) + .collect::>(); + debug_assert_eq!(scalar_values.len(), row_groups.len()); + ScalarValue::iter_to_array(scalar_values).ok() + } + + /// Returns null counts of specific non-tag columns. + fn column_null_counts( + row_groups: &[RowGroupMetaData], + column_index: usize, + ) -> Option { + let values = row_groups.iter().map(|meta| { + let col = meta.column(column_index); + let stat = col.statistics()?; + Some(stat.null_count()) + }); + Some(Arc::new(UInt64Array::from_iter(values))) + } + + /// Field index of the primary key. + fn primary_key_position(&self) -> usize { + self.arrow_schema.fields.len() - 3 + } + + /// Field index of the time index. + fn time_index_position(&self) -> usize { + self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM + } } /// Gets the arrow schema to store in parquet. @@ -328,12 +535,16 @@ fn internal_fields() -> [FieldRef; 3] { [ Arc::new(Field::new_dictionary( PRIMARY_KEY_COLUMN_NAME, - DataType::UInt16, - DataType::Binary, + ArrowDataType::UInt16, + ArrowDataType::Binary, false, )), - Arc::new(Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false)), - Arc::new(Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false)), + Arc::new(Field::new( + SEQUENCE_COLUMN_NAME, + ArrowDataType::UInt64, + false, + )), + Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)), ] } @@ -408,20 +619,23 @@ mod tests { fn build_test_arrow_schema() -> SchemaRef { let fields = vec![ - Field::new("field1", DataType::Int64, true), - Field::new("field0", DataType::Int64, true), + Field::new("field1", ArrowDataType::Int64, true), + Field::new("field0", ArrowDataType::Int64, true), Field::new( "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), + ArrowDataType::Timestamp(TimeUnit::Millisecond, None), false, ), Field::new( "__primary_key", - DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary)), + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt16), + Box::new(ArrowDataType::Binary), + ), false, ), - Field::new("__sequence", DataType::UInt64, false), - Field::new("__op_type", DataType::UInt8, false), + Field::new("__sequence", ArrowDataType::UInt64, false), + Field::new("__op_type", ArrowDataType::UInt8, false), ]; Arc::new(Schema::new(fields)) } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index a91d781dad92..e54391b15a4a 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,6 +14,7 @@ //! Parquet reader. +use std::collections::HashSet; use std::ops::Range; use std::sync::Arc; @@ -44,6 +45,7 @@ use crate::error::{ use crate::read::{Batch, BatchReader}; use crate::sst::file::{FileHandle, FileId}; use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::PARQUET_METADATA_KEY; /// Parquet SST reader builder. @@ -151,18 +153,17 @@ impl ParquetReaderBuilder { let key_value_meta = builder.metadata().file_metadata().key_value_metadata(); let region_meta = self.get_region_metadata(file_path, key_value_meta)?; - // Prune row groups by metadata. - if let Some(predicate) = &self.predicate { - // TODO(yingwen): Now we encode tags into the full primary key so we need some approach - // to implement pruning. - let pruned_row_groups = predicate - .prune_row_groups(builder.metadata().row_groups()) - .into_iter() - .enumerate() - .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) - .collect::>(); - builder = builder.with_row_groups(pruned_row_groups); - } + let column_ids: HashSet<_> = self + .projection + .as_ref() + .map(|p| p.iter().cloned().collect()) + .unwrap_or_else(|| { + region_meta + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect() + }); let read_format = ReadFormat::new(Arc::new(region_meta)); // The arrow schema converted from the region meta should be the same as parquet's. @@ -179,6 +180,22 @@ impl ParquetReaderBuilder { } ); + // Prune row groups by metadata. + if let Some(predicate) = &self.predicate { + let stats = RowGroupPruningStats::new( + builder.metadata().row_groups(), + &read_format, + column_ids, + ); + let pruned_row_groups = predicate + .prune_with_stats(&stats) + .into_iter() + .enumerate() + .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) + .collect::>(); + builder = builder.with_row_groups(pruned_row_groups); + } + let parquet_schema_desc = builder.metadata().file_metadata().schema_descr(); if let Some(column_ids) = self.projection.as_ref() { let indices = read_format.projection_indices(column_ids.iter().copied()); diff --git a/src/mito2/src/sst/parquet/stats.rs b/src/mito2/src/sst/parquet/stats.rs new file mode 100644 index 000000000000..f0cda3846737 --- /dev/null +++ b/src/mito2/src/sst/parquet/stats.rs @@ -0,0 +1,83 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Statistics of parquet SSTs. + +use std::collections::HashSet; + +use datafusion::physical_optimizer::pruning::PruningStatistics; +use datafusion_common::Column; +use datatypes::arrow::array::ArrayRef; +use parquet::file::metadata::RowGroupMetaData; +use store_api::storage::ColumnId; + +use crate::sst::parquet::format::ReadFormat; + +/// Statistics for pruning row groups. +pub(crate) struct RowGroupPruningStats<'a> { + /// Metadata of SST row groups. + row_groups: &'a [RowGroupMetaData], + /// Helper to read the SST. + read_format: &'a ReadFormat, + /// Projected column ids to read. + /// + /// We need column ids to distinguish different columns with the same name. + /// e.g. Drops and then adds a column again. + column_ids: HashSet, +} + +impl<'a> RowGroupPruningStats<'a> { + /// Creates a new statistics to prune specific `row_groups`. + pub(crate) fn new( + row_groups: &'a [RowGroupMetaData], + read_format: &'a ReadFormat, + column_ids: HashSet, + ) -> Self { + Self { + row_groups, + read_format, + column_ids, + } + } + + /// Returns the column id of specific column name if we need to read it. + fn column_id_to_prune(&self, name: &str) -> Option { + // Only use stats when the column to read has the same id as the column in the SST. + self.read_format + .metadata() + .column_by_name(name) + .and_then(|col| self.column_ids.get(&col.column_id).copied()) + } +} + +impl<'a> PruningStatistics for RowGroupPruningStats<'a> { + fn min_values(&self, column: &Column) -> Option { + let column_id = self.column_id_to_prune(&column.name)?; + self.read_format.min_values(self.row_groups, column_id) + } + + fn max_values(&self, column: &Column) -> Option { + let column_id = self.column_id_to_prune(&column.name)?; + self.read_format.max_values(self.row_groups, column_id) + } + + fn num_containers(&self) -> usize { + self.row_groups.len() + } + + fn null_counts(&self, column: &Column) -> Option { + let column_id = self.column_id_to_prune(&column.name)?; + self.read_format.null_counts(self.row_groups, column_id) + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 0484b2c9d087..c9621249212c 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -575,9 +575,12 @@ pub async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { } /// Flush a region manually. -pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) { +pub async fn flush_region(engine: &MitoEngine, region_id: RegionId, row_group_size: Option) { let Output::AffectedRows(rows) = engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { row_group_size }), + ) .await .unwrap() else { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6a2147b19580..f507da5f6362 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -534,8 +534,9 @@ impl RegionWorkerLoop { .await; continue; } - DdlRequest::Flush(_) => { - self.handle_flush_request(ddl.region_id, ddl.sender).await; + DdlRequest::Flush(req) => { + self.handle_flush_request(ddl.region_id, req.row_group_size, ddl.sender) + .await; continue; } DdlRequest::Compact(_) => { diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index ceb4ad4586ee..0231217efb0d 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -80,7 +80,7 @@ impl RegionWorkerLoop { info!("Flush region: {} before alteration", region_id); // Try to submit a flush task. - let task = self.new_flush_task(®ion, FlushReason::Alter); + let task = self.new_flush_task(®ion, FlushReason::Alter, None); if let Err(e) = self.flush_scheduler .schedule_flush(region.region_id, ®ion.version_control, task) diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 2c1bad5cf544..7640c31fa74c 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -31,13 +31,14 @@ impl RegionWorkerLoop { pub(crate) async fn handle_flush_request( &mut self, region_id: RegionId, + row_group_size: Option, mut sender: OptionOutputTx, ) { let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; - let mut task = self.new_flush_task(®ion, FlushReason::Manual); + let mut task = self.new_flush_task(®ion, FlushReason::Manual, row_group_size); task.push_sender(sender); if let Err(e) = self.flush_scheduler @@ -92,7 +93,7 @@ impl RegionWorkerLoop { if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. - let task = self.new_flush_task(region, FlushReason::EngineFull); + let task = self.new_flush_task(region, FlushReason::EngineFull, None); self.flush_scheduler.schedule_flush( region.region_id, ®ion.version_control, @@ -105,7 +106,7 @@ impl RegionWorkerLoop { // TODO(yingwen): Maybe flush more tables to reduce write buffer size. if let Some(region) = max_mem_region { if !self.flush_scheduler.is_flush_requested(region.region_id) { - let task = self.new_flush_task(region, FlushReason::EngineFull); + let task = self.new_flush_task(region, FlushReason::EngineFull, None); self.flush_scheduler.schedule_flush( region.region_id, ®ion.version_control, @@ -122,6 +123,7 @@ impl RegionWorkerLoop { &self, region: &MitoRegionRef, reason: FlushReason, + row_group_size: Option, ) -> RegionFlushTask { // TODO(yingwen): metrics for flush requested. RegionFlushTask { @@ -133,6 +135,7 @@ impl RegionWorkerLoop { memtable_builder: self.memtable_builder.clone(), file_purger: region.file_purger.clone(), listener: self.listener.clone(), + row_group_size, } } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 3e202912c743..26fe4cae9eef 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -110,7 +110,9 @@ impl RegionRequest { )]), region_request::Body::Flush(flush) => Ok(vec![( flush.region_id.into(), - Self::Flush(RegionFlushRequest {}), + Self::Flush(RegionFlushRequest { + row_group_size: None, + }), )]), region_request::Body::Compact(compact) => Ok(vec![( compact.region_id.into(), @@ -415,8 +417,10 @@ impl TryFrom for AddColumnLocation { } } -#[derive(Debug)] -pub struct RegionFlushRequest {} +#[derive(Debug, Default)] +pub struct RegionFlushRequest { + pub row_group_size: Option, +} #[derive(Debug)] pub struct RegionCompactRequest {} diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index e11117f8066b..46208b951251 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -20,7 +20,7 @@ use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datafusion::parquet::file::metadata::RowGroupMetaData; -use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion_common::ToDFSchema; use datafusion_expr::expr::InList; use datafusion_expr::{Between, BinaryExpr, Operator}; @@ -37,7 +37,7 @@ mod stats; #[derive(Clone)] pub struct Predicate { - /// The schema of underlying storage. + /// The schema of the table that the expressions being applied. schema: SchemaRef, /// Physical expressions of this predicate. exprs: Vec>, @@ -118,6 +118,31 @@ impl Predicate { } res } + + /// Evaluates the predicate against the `stats`. + /// Returns a vector of boolean values, among which `false` means the row group can be skipped. + pub fn prune_with_stats(&self, stats: &S) -> Vec { + let mut res = vec![true; stats.num_containers()]; + let arrow_schema = self.schema.arrow_schema(); + for expr in &self.exprs { + match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) { + Ok(p) => match p.prune(stats) { + Ok(r) => { + for (curr_val, res) in r.into_iter().zip(res.iter_mut()) { + *res &= curr_val + } + } + Err(e) => { + warn!("Failed to prune row groups, error: {:?}", e); + } + }, + Err(e) => { + error!("Failed to create predicate for expr, error: {:?}", e); + } + } + } + res + } } // tests for `TimeRangePredicateBuilder` locates in src/query/tests/time_range_filter_test.rs