From 2f4347647172f6997448b2e24d322b50c856f3a0 Mon Sep 17 00:00:00 2001 From: Marvin Lanhenke <62298609+marvinlanhenke@users.noreply.github.com> Date: Sat, 15 Jun 2024 17:58:44 +0200 Subject: [PATCH] Initial Extract parquet data page statistics API (#10852) * feat: enable page statistics * feat: prototype int64 data_page_min * feat: prototype MinInt64DataPageStatsIterator * feat: add make_data_page_stats_iterator macro * feat: add get_data_page_statistics macro * feat: add MaxInt64DataPageStatsIterator * feat: add test_data_page_stats param * chore: add testcase int64_with_nulls * feat: add data page null_counts * fix: clippy * chore: rename column_page_index * feat: add data page row counts * feat: add num_data_pages to iterator * chore: update docs * fix: use colum_offset len in data_page_null_counts * fix: docs * tweak comments * update test helper * Add explicit multi-data page tests to statistics test * Add explicit data page test * remove duplicate test * update coverage --------- Co-authored-by: Andrew Lamb --- .../physical_plan/parquet/statistics.rs | 315 +++++++++++- .../core/tests/parquet/arrow_statistics.rs | 479 +++++++++++++----- datafusion/core/tests/parquet/mod.rs | 3 +- 3 files changed, 657 insertions(+), 140 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index c0d36f1fc4d7..a2e0d8fa66be 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -33,7 +33,8 @@ use arrow_array::{ use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; use half::f16; -use parquet::file::metadata::RowGroupMetaData; +use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; +use parquet::file::page_index::index::Index; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use paste::paste; @@ -517,6 +518,74 @@ macro_rules! get_statistics { }}} } +macro_rules! make_data_page_stats_iterator { + ($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => { + struct $iterator_type<'a, I> + where + I: Iterator, + { + iter: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator, + { + fn new(iter: I) -> Self { + Self { iter } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator, + { + type Item = Vec>; + + fn next(&mut self) -> Option { + let next = self.iter.next(); + match next { + Some((len, index)) => match index { + $index_type(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| x.$func) + .collect::>(), + ), + // No matching `Index` found; + // thus no statistics that can be extracted. + // We return vec![None; len] to effectively + // create an arrow null-array with the length + // corresponding to the number of entries in + // `ParquetOffsetIndex` per row group per column. + _ => Some(vec![None; len]), + }, + _ => None, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + } + }; +} + +make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64); +make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64); + +macro_rules! get_data_page_statistics { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + paste! { + match $data_type { + Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), + _ => unimplemented!() + } + } + } +} + /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field @@ -563,6 +632,51 @@ fn max_statistics<'a, I: Iterator>>( get_statistics!(Max, data_type, iterator) } +/// Extracts the min statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn min_page_statistics<'a, I>( + data_type: Option<&DataType>, + iterator: I, +) -> Result +where + I: Iterator, +{ + get_data_page_statistics!(Min, data_type, iterator) +} + +/// Extracts the max statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn max_page_statistics<'a, I>( + data_type: Option<&DataType>, + iterator: I, +) -> Result +where + I: Iterator, +{ + get_data_page_statistics!(Max, data_type, iterator) +} + +/// Extracts the null count statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +/// +/// The returned Array is an [`UInt64Array`] +pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result +where + I: Iterator, +{ + let iter = iterator.flat_map(|(len, index)| match index { + Index::NONE => vec![None; len], + Index::INT64(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::>(), + _ => unimplemented!(), + }); + + Ok(Arc::new(UInt64Array::from_iter(iter))) +} + /// Extracts Parquet statistics as Arrow arrays /// /// This is used to convert Parquet statistics to Arrow arrays, with proper type @@ -771,10 +885,205 @@ impl<'a> StatisticsConverter<'a> { Ok(Arc::new(UInt64Array::from_iter(null_counts))) } + /// Extract the minimum values from Data Page statistics. + /// + /// In Parquet files, in addition to the Column Chunk level statistics + /// (stored for each column for each row group) there are also + /// optional statistics stored for each data page, as part of + /// the [`ParquetColumnIndex`]. + /// + /// Since a single Column Chunk is stored as one or more pages, + /// page level statistics can prune at a finer granularity. + /// + /// However since they are stored in a separate metadata + /// structure ([`Index`]) there is different code to extract them as + /// compared to arrow statistics. + /// + /// # Parameters: + /// + /// * `column_page_index`: The parquet column page indices, read from + /// `ParquetMetaData` column_index + /// + /// * `column_offset_index`: The parquet column offset indices, read from + /// `ParquetMetaData` offset_index + /// + /// * `row_group_indices`: The indices of the row groups, that are used to + /// extract the column page index and offset index on a per row group + /// per column basis. + /// + /// # Return Value + /// + /// The returned array contains 1 value for each `NativeIndex` + /// in the underlying `Index`es, in the same order as they appear + /// in `metadatas`. + /// + /// For example, if there are two `Index`es in `metadatas`: + /// 1. the first having `3` `PageIndex` entries + /// 2. the second having `2` `PageIndex` entries + /// + /// The returned array would have 5 rows. + /// + /// Each value is either: + /// * the minimum value for the page + /// * a null value, if the statistics can not be extracted + /// + /// Note that a null value does NOT mean the min value was actually + /// `null` it means it the requested statistic is unknown + /// + /// # Errors + /// + /// Reasons for not being able to extract the statistics include: + /// * the column is not present in the parquet file + /// * statistics for the pages are not present in the row group + /// * the stored statistic value can not be converted to the requested type + pub fn data_page_mins( + &self, + column_page_index: &ParquetColumnIndex, + column_offset_index: &ParquetOffsetIndex, + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + let iter = row_group_indices.into_iter().map(|rg_index| { + let column_page_index_per_row_group_per_column = + &column_page_index[*rg_index][parquet_index]; + let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + + (*num_data_pages, column_page_index_per_row_group_per_column) + }); + + min_page_statistics(Some(data_type), iter) + } + + /// Extract the maximum values from Data Page statistics. + /// + /// See docs on [`Self::data_page_mins`] for details. + pub fn data_page_maxes( + &self, + column_page_index: &ParquetColumnIndex, + column_offset_index: &ParquetOffsetIndex, + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + let iter = row_group_indices.into_iter().map(|rg_index| { + let column_page_index_per_row_group_per_column = + &column_page_index[*rg_index][parquet_index]; + let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + + (*num_data_pages, column_page_index_per_row_group_per_column) + }); + + max_page_statistics(Some(data_type), iter) + } + + /// Extract the null counts from Data Page statistics. + /// + /// The returned Array is an [`UInt64Array`] + /// + /// See docs on [`Self::data_page_mins`] for details. + pub fn data_page_null_counts( + &self, + column_page_index: &ParquetColumnIndex, + column_offset_index: &ParquetOffsetIndex, + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + let iter = row_group_indices.into_iter().map(|rg_index| { + let column_page_index_per_row_group_per_column = + &column_page_index[*rg_index][parquet_index]; + let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + + (*num_data_pages, column_page_index_per_row_group_per_column) + }); + null_counts_page_statistics(iter) + } + + /// Returns an [`ArrayRef`] with row counts for each row group. + /// + /// This function iterates over the given row group indexes and computes + /// the row count for each page in the specified column. + /// + /// # Parameters: + /// + /// * `column_offset_index`: The parquet column offset indices, read from + /// `ParquetMetaData` offset_index + /// + /// * `row_group_metadatas`: The metadata slice of the row groups, read + /// from `ParquetMetaData` row_groups + /// + /// * `row_group_indices`: The indices of the row groups, that are used to + /// extract the column offset index on a per row group per column basis. + /// + /// See docs on [`Self::data_page_mins`] for details. + pub fn data_page_row_counts( + &self, + column_offset_index: &ParquetOffsetIndex, + row_group_metadatas: &[RowGroupMetaData], + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + // `offset_index[row_group_number][column_number][page_number]` holds + // the [`PageLocation`] corresponding to page `page_number` of column + // `column_number`of row group `row_group_number`. + let mut row_count_total = Vec::new(); + for rg_idx in row_group_indices { + let page_locations = &column_offset_index[*rg_idx][parquet_index]; + + let row_count_per_page = page_locations.windows(2).map(|loc| { + Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64) + }); + + let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows(); + + // append the last page row count + let row_count_per_page = row_count_per_page + .chain(std::iter::once(Some( + *num_rows_in_row_group as u64 + - page_locations.last().unwrap().first_row_index as u64, + ))) + .collect::>(); + + row_count_total.extend(row_count_per_page); + } + + Ok(Arc::new(UInt64Array::from_iter(row_count_total))) + } + /// Returns a null array of data_type with one element per row group - fn make_null_array(&self, data_type: &DataType, metadatas: I) -> ArrayRef + fn make_null_array(&self, data_type: &DataType, metadatas: I) -> ArrayRef where - I: IntoIterator, + I: IntoIterator, { // column was in the arrow schema but not in the parquet schema, so return a null array let num_row_groups = metadatas.into_iter().count(); diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 2ea18d7cf823..3c812800e2b7 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -18,6 +18,7 @@ //! This file contains an end to end test of extracting statitics from parquet files. //! It writes data into a parquet file, reads statistics and verifies they are correct +use std::default::Default; use std::fs::File; use std::sync::Arc; @@ -39,102 +40,102 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::StatisticsConverter; use half::f16; -use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::arrow_reader::{ + ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; use super::make_test_file_rg; -// TEST HELPERS - -/// Return a record batch with i64 with Null values -fn make_int64_batches_with_null( +#[derive(Debug, Default, Clone)] +struct Int64Case { + /// Number of nulls in the column null_values: usize, + /// Non null values in the range `[no_null_values_start, + /// no_null_values_end]`, one value for each row no_null_values_start: i64, no_null_values_end: i64, -) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("i64", DataType::Int64, true)])); - - let v64: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); - - RecordBatch::try_new( - schema, - vec![make_array( - Int64Array::from_iter( - v64.into_iter() - .map(Some) - .chain(std::iter::repeat(None).take(null_values)), - ) - .to_data(), - )], - ) - .unwrap() -} - -// Create a parquet file with one column for data type i64 -// Data of the file include -// . Number of null rows is the given num_null -// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row -// . The file is divided into row groups of size row_per_group -pub fn parquet_file_one_column( - num_null: usize, - no_null_values_start: i64, - no_null_values_end: i64, + /// Number of rows per row group row_per_group: usize, -) -> ParquetRecordBatchReaderBuilder { - parquet_file_one_column_stats( - num_null, - no_null_values_start, - no_null_values_end, - row_per_group, - EnabledStatistics::Chunk, - ) + /// if specified, overrides default statistics settings + enable_stats: Option, + /// If specified, the number of values in each page + data_page_row_count_limit: Option, } -// Create a parquet file with one column for data type i64 -// Data of the file include -// . Number of null rows is the given num_null -// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row -// . The file is divided into row groups of size row_per_group -// . Statistics are enabled/disabled based on the given enable_stats -pub fn parquet_file_one_column_stats( - num_null: usize, - no_null_values_start: i64, - no_null_values_end: i64, - row_per_group: usize, - enable_stats: EnabledStatistics, -) -> ParquetRecordBatchReaderBuilder { - let mut output_file = tempfile::Builder::new() - .prefix("parquert_statistics_test") - .suffix(".parquet") - .tempfile() - .expect("tempfile creation"); - - let props = WriterProperties::builder() - .set_max_row_group_size(row_per_group) - .set_statistics_enabled(enable_stats) - .build(); - - let batches = vec![make_int64_batches_with_null( - num_null, - no_null_values_start, - no_null_values_end, - )]; - - let schema = batches[0].schema(); - - let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); +impl Int64Case { + /// Return a record batch with i64 with Null values + /// The first no_null_values_end - no_null_values_start values + /// are non-null with the specified range, the rest are null + fn make_int64_batches_with_null(&self) -> RecordBatch { + let schema = + Arc::new(Schema::new(vec![Field::new("i64", DataType::Int64, true)])); + + let v64: Vec = + (self.no_null_values_start as _..self.no_null_values_end as _).collect(); + + RecordBatch::try_new( + schema, + vec![make_array( + Int64Array::from_iter( + v64.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(self.null_values)), + ) + .to_data(), + )], + ) + .unwrap() + } + + // Create a parquet file with the specified settings + pub fn build(&self) -> ParquetRecordBatchReaderBuilder { + let mut output_file = tempfile::Builder::new() + .prefix("parquert_statistics_test") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); + + let mut builder = + WriterProperties::builder().set_max_row_group_size(self.row_per_group); + if let Some(enable_stats) = self.enable_stats { + builder = builder.set_statistics_enabled(enable_stats); + } + if let Some(data_page_row_count_limit) = self.data_page_row_count_limit { + builder = builder.set_data_page_row_count_limit(data_page_row_count_limit); + } + let props = builder.build(); + + let batches = vec![self.make_int64_batches_with_null()]; + + let schema = batches[0].schema(); + + let mut writer = + ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + + // if we have a datapage limit send the batches in one at a time to give + // the writer a chance to be split into multiple pages + if self.data_page_row_count_limit.is_some() { + for batch in batches { + for i in 0..batch.num_rows() { + writer.write(&batch.slice(i, 1)).expect("writing batch"); + } + } + } else { + for batch in batches { + writer.write(&batch).expect("writing batch"); + } + } + + // close file + let _file_meta = writer.close().unwrap(); - for batch in batches { - writer.write(&batch).expect("writing batch"); + // open the file & get the reader + let file = output_file.reopen().unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + ArrowReaderBuilder::try_new_with_options(file, options).unwrap() } - - // close file - let _file_meta = writer.close().unwrap(); - - // open the file & get the reader - let file = output_file.reopen().unwrap(); - ArrowReaderBuilder::try_new(file).unwrap() } /// Defines what data to create in a parquet file @@ -158,7 +159,8 @@ impl TestReader { // open the file & get the reader let file = file.reopen().unwrap(); - ArrowReaderBuilder::try_new(file).unwrap() + let options = ArrowReaderOptions::new().with_page_index(true); + ArrowReaderBuilder::try_new_with_options(file, options).unwrap() } } @@ -172,6 +174,9 @@ struct Test<'a> { expected_row_counts: UInt64Array, /// Which column to extract statistics from column_name: &'static str, + /// If true, extracts and compares data page statistics rather than row + /// group statistics + test_data_page_statistics: bool, } impl<'a> Test<'a> { @@ -183,6 +188,7 @@ impl<'a> Test<'a> { expected_null_counts, expected_row_counts, column_name, + test_data_page_statistics, } = self; let converter = StatisticsConverter::try_new( @@ -193,36 +199,103 @@ impl<'a> Test<'a> { .unwrap(); let row_groups = reader.metadata().row_groups(); - let min = converter.row_group_mins(row_groups).unwrap(); - - assert_eq!( - &min, &expected_min, - "{column_name}: Mismatch with expected minimums" - ); - let max = converter.row_group_maxes(row_groups).unwrap(); - assert_eq!( - &max, &expected_max, - "{column_name}: Mismatch with expected maximum" - ); - - let null_counts = converter.row_group_null_counts(row_groups).unwrap(); - let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; - assert_eq!( - &null_counts, &expected_null_counts, - "{column_name}: Mismatch with expected null counts. \ - Actual: {null_counts:?}. Expected: {expected_null_counts:?}" - ); - - let row_counts = StatisticsConverter::row_group_row_counts( - reader.metadata().row_groups().iter(), - ) - .unwrap(); - assert_eq!( - row_counts, expected_row_counts, - "{column_name}: Mismatch with expected row counts. \ - Actual: {row_counts:?}. Expected: {expected_row_counts:?}" - ); + if test_data_page_statistics { + let column_page_index = reader + .metadata() + .column_index() + .expect("File should have column page indices"); + + let column_offset_index = reader + .metadata() + .offset_index() + .expect("File should have column offset indices"); + + let row_group_indices = row_groups + .iter() + .enumerate() + .map(|(i, _)| i) + .collect::>(); + + let min = converter + .data_page_mins( + column_page_index, + column_offset_index, + &row_group_indices, + ) + .unwrap(); + assert_eq!( + &min, &expected_min, + "{column_name}: Mismatch with expected data page minimums" + ); + + let max = converter + .data_page_maxes( + column_page_index, + column_offset_index, + &row_group_indices, + ) + .unwrap(); + assert_eq!( + &max, &expected_max, + "{column_name}: Mismatch with expected data page maximum" + ); + + let null_counts = converter + .data_page_null_counts( + column_page_index, + column_offset_index, + &row_group_indices, + ) + .unwrap(); + + let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; + assert_eq!( + &null_counts, &expected_null_counts, + "{column_name}: Mismatch with expected data page null counts. \ + Actual: {null_counts:?}. Expected: {expected_null_counts:?}" + ); + + let row_counts = converter + .data_page_row_counts(column_offset_index, row_groups, &row_group_indices) + .unwrap(); + let expected_row_counts = Arc::new(expected_row_counts) as ArrayRef; + assert_eq!( + &row_counts, &expected_row_counts, + "{column_name}: Mismatch with expected row counts. \ + Actual: {row_counts:?}. Expected: {expected_row_counts:?}" + ); + } else { + let min = converter.row_group_mins(row_groups).unwrap(); + assert_eq!( + &min, &expected_min, + "{column_name}: Mismatch with expected minimums" + ); + + let max = converter.row_group_maxes(row_groups).unwrap(); + assert_eq!( + &max, &expected_max, + "{column_name}: Mismatch with expected maximum" + ); + + let null_counts = converter.row_group_null_counts(row_groups).unwrap(); + let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; + assert_eq!( + &null_counts, &expected_null_counts, + "{column_name}: Mismatch with expected null counts. \ + Actual: {null_counts:?}. Expected: {expected_null_counts:?}" + ); + + let row_counts = StatisticsConverter::row_group_row_counts( + reader.metadata().row_groups().iter(), + ) + .unwrap(); + assert_eq!( + row_counts, expected_row_counts, + "{column_name}: Mismatch with expected row counts. \ + Actual: {row_counts:?}. Expected: {expected_row_counts:?}" + ); + } } /// Run the test and expect a column not found error @@ -234,6 +307,7 @@ impl<'a> Test<'a> { expected_null_counts: _, expected_row_counts: _, column_name, + .. } = self; let converter = StatisticsConverter::try_new( @@ -254,8 +328,15 @@ impl<'a> Test<'a> { #[tokio::test] async fn test_one_row_group_without_null() { - let row_per_group = 20; - let reader = parquet_file_one_column(0, 4, 7, row_per_group); + let reader = Int64Case { + null_values: 0, + no_null_values_start: 4, + no_null_values_end: 7, + row_per_group: 20, + ..Default::default() + } + .build(); + Test { reader: &reader, // min is 4 @@ -267,14 +348,21 @@ async fn test_one_row_group_without_null() { // 3 rows expected_row_counts: UInt64Array::from(vec![3]), column_name: "i64", + test_data_page_statistics: false, } .run() } #[tokio::test] async fn test_one_row_group_with_null_and_negative() { - let row_per_group = 20; - let reader = parquet_file_one_column(2, -1, 5, row_per_group); + let reader = Int64Case { + null_values: 2, + no_null_values_start: -1, + no_null_values_end: 5, + row_per_group: 20, + ..Default::default() + } + .build(); Test { reader: &reader, @@ -287,14 +375,21 @@ async fn test_one_row_group_with_null_and_negative() { // 8 rows expected_row_counts: UInt64Array::from(vec![8]), column_name: "i64", + test_data_page_statistics: false, } .run() } #[tokio::test] async fn test_two_row_group_with_null() { - let row_per_group = 10; - let reader = parquet_file_one_column(2, 4, 17, row_per_group); + let reader = Int64Case { + null_values: 2, + no_null_values_start: 4, + no_null_values_end: 17, + row_per_group: 10, + ..Default::default() + } + .build(); Test { reader: &reader, @@ -307,14 +402,21 @@ async fn test_two_row_group_with_null() { // row counts are [10, 5] expected_row_counts: UInt64Array::from(vec![10, 5]), column_name: "i64", + test_data_page_statistics: false, } .run() } #[tokio::test] async fn test_two_row_groups_with_all_nulls_in_one() { - let row_per_group = 5; - let reader = parquet_file_one_column(4, -2, 2, row_per_group); + let reader = Int64Case { + null_values: 4, + no_null_values_start: -2, + no_null_values_end: 2, + row_per_group: 5, + ..Default::default() + } + .build(); Test { reader: &reader, @@ -327,6 +429,38 @@ async fn test_two_row_groups_with_all_nulls_in_one() { // row counts are [5, 3] expected_row_counts: UInt64Array::from(vec![5, 3]), column_name: "i64", + test_data_page_statistics: false, + } + .run() +} + +#[tokio::test] +async fn test_multiple_data_pages_nulls_and_negatives() { + let reader = Int64Case { + null_values: 3, + no_null_values_start: -1, + no_null_values_end: 10, + row_per_group: 20, + // limit page row count to 4 + data_page_row_count_limit: Some(4), + enable_stats: Some(EnabledStatistics::Page), + } + .build(); + + // Data layout looks like this: + // + // page 0: [-1, 0, 1, 2] + // page 1: [3, 4, 5, 6] + // page 2: [7, 8, 9, null] + // page 3: [null, null] + Test { + reader: &reader, + expected_min: Arc::new(Int64Array::from(vec![Some(-1), Some(3), Some(7), None])), + expected_max: Arc::new(Int64Array::from(vec![Some(2), Some(6), Some(9), None])), + expected_null_counts: UInt64Array::from(vec![0, 0, 1, 2]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 2]), + column_name: "i64", + test_data_page_statistics: true, } .run() } @@ -347,19 +481,23 @@ async fn test_int_64() { .build() .await; - Test { - reader: &reader, - // mins are [-5, -4, 0, 5] - expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])), - // maxes are [-1, 0, 4, 9] - expected_max: Arc::new(Int64Array::from(vec![-1, 0, 4, 9])), - // nulls are [0, 0, 0, 0] - expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), - // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), - column_name: "i64", + // since each row has only one data page, the statistics are the same + for test_data_page_statistics in [true, false] { + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int64Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "i64", + test_data_page_statistics, + } + .run(); } - .run(); } #[tokio::test] @@ -383,6 +521,7 @@ async fn test_int_32() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i32", + test_data_page_statistics: false, } .run(); } @@ -423,6 +562,7 @@ async fn test_int_16() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i16", + test_data_page_statistics: false, } .run(); } @@ -451,6 +591,7 @@ async fn test_int_8() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i8", + test_data_page_statistics: false, } .run(); } @@ -500,6 +641,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "nanos", + test_data_page_statistics: false, } .run(); @@ -528,6 +670,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "nanos_timezoned", + test_data_page_statistics: false, } .run(); @@ -549,6 +692,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "micros", + test_data_page_statistics: false, } .run(); @@ -577,6 +721,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "micros_timezoned", + test_data_page_statistics: false, } .run(); @@ -598,6 +743,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "millis", + test_data_page_statistics: false, } .run(); @@ -626,6 +772,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "millis_timezoned", + test_data_page_statistics: false, } .run(); @@ -647,6 +794,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "seconds", + test_data_page_statistics: false, } .run(); @@ -675,6 +823,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "seconds_timezoned", + test_data_page_statistics: false, } .run(); } @@ -720,6 +869,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "nanos", + test_data_page_statistics: false, } .run(); @@ -746,6 +896,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "nanos_timezoned", + test_data_page_statistics: false, } .run(); @@ -765,6 +916,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "micros", + test_data_page_statistics: false, } .run(); @@ -791,6 +943,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "micros_timezoned", + test_data_page_statistics: false, } .run(); @@ -810,6 +963,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "millis", + test_data_page_statistics: false, } .run(); @@ -836,6 +990,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "millis_timezoned", + test_data_page_statistics: false, } .run(); @@ -855,6 +1010,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "seconds", + test_data_page_statistics: false, } .run(); @@ -881,6 +1037,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "seconds_timezoned", + test_data_page_statistics: false, } .run(); } @@ -918,6 +1075,7 @@ async fn test_dates_32_diff_rg_sizes() { // row counts are [13, 7] expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "date32", + test_data_page_statistics: false, } .run(); } @@ -940,6 +1098,7 @@ async fn test_time32_second_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "second", + test_data_page_statistics: false, } .run(); } @@ -966,6 +1125,7 @@ async fn test_time32_millisecond_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "millisecond", + test_data_page_statistics: false, } .run(); } @@ -998,6 +1158,7 @@ async fn test_time64_microsecond_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "microsecond", + test_data_page_statistics: false, } .run(); } @@ -1030,6 +1191,7 @@ async fn test_time64_nanosecond_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "nanosecond", + test_data_page_statistics: false, } .run(); } @@ -1056,6 +1218,7 @@ async fn test_dates_64_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "date64", + test_data_page_statistics: false, } .run(); } @@ -1083,6 +1246,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u8", + test_data_page_statistics: false, } .run(); @@ -1093,6 +1257,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u16", + test_data_page_statistics: false, } .run(); @@ -1103,6 +1268,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u32", + test_data_page_statistics: false, } .run(); @@ -1113,6 +1279,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u64", + test_data_page_statistics: false, } .run(); } @@ -1135,6 +1302,7 @@ async fn test_int32_range() { expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![4]), column_name: "i", + test_data_page_statistics: false, } .run(); } @@ -1157,6 +1325,7 @@ async fn test_uint32_range() { expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![4]), column_name: "u", + test_data_page_statistics: false, } .run(); } @@ -1178,6 +1347,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u8", + test_data_page_statistics: false, } .run(); @@ -1188,6 +1358,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u16", + test_data_page_statistics: false, } .run(); @@ -1198,6 +1369,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u32", + test_data_page_statistics: false, } .run(); @@ -1208,6 +1380,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u64", + test_data_page_statistics: false, } .run(); } @@ -1229,6 +1402,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i8", + test_data_page_statistics: false, } .run(); @@ -1239,6 +1413,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i16", + test_data_page_statistics: false, } .run(); @@ -1249,6 +1424,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i32", + test_data_page_statistics: false, } .run(); @@ -1259,6 +1435,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i64", + test_data_page_statistics: false, } .run(); } @@ -1280,6 +1457,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f32", + test_data_page_statistics: false, } .run(); @@ -1290,6 +1468,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f64", + test_data_page_statistics: false, } .run(); @@ -1300,6 +1479,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f32_nan", + test_data_page_statistics: false, } .run(); @@ -1310,6 +1490,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f64_nan", + test_data_page_statistics: false, } .run(); } @@ -1332,6 +1513,7 @@ async fn test_float64() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "f", + test_data_page_statistics: false, } .run(); } @@ -1364,6 +1546,7 @@ async fn test_float16() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "f", + test_data_page_statistics: false, } .run(); } @@ -1394,6 +1577,7 @@ async fn test_decimal() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "decimal_col", + test_data_page_statistics: false, } .run(); } @@ -1431,6 +1615,7 @@ async fn test_decimal_256() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "decimal256_col", + test_data_page_statistics: false, } .run(); } @@ -1450,6 +1635,7 @@ async fn test_dictionary() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "string_dict_i8", + test_data_page_statistics: false, } .run(); @@ -1460,6 +1646,7 @@ async fn test_dictionary() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "string_dict_i32", + test_data_page_statistics: false, } .run(); @@ -1470,6 +1657,7 @@ async fn test_dictionary() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "int_dict_i8", + test_data_page_statistics: false, } .run(); } @@ -1507,6 +1695,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "name", + test_data_page_statistics: false, } .run(); @@ -1526,6 +1715,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_string", + test_data_page_statistics: false, } .run(); @@ -1544,6 +1734,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_binary", + test_data_page_statistics: false, } .run(); @@ -1564,6 +1755,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_fixedsize", + test_data_page_statistics: false, } .run(); @@ -1584,6 +1776,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_large_binary", + test_data_page_statistics: false, } .run(); } @@ -1616,6 +1809,7 @@ async fn test_period_in_column_names() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "name", + test_data_page_statistics: false, } .run(); @@ -1629,6 +1823,7 @@ async fn test_period_in_column_names() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service.name", + test_data_page_statistics: false, } .run(); } @@ -1652,6 +1847,7 @@ async fn test_boolean() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 5]), column_name: "bool", + test_data_page_statistics: false, } .run(); } @@ -1678,6 +1874,7 @@ async fn test_struct() { expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![3]), column_name: "struct", + test_data_page_statistics: false, } .run(); } @@ -1700,6 +1897,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 5]), column_name: "utf8", + test_data_page_statistics: false, } .run(); @@ -1711,6 +1909,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 5]), column_name: "large_utf8", + test_data_page_statistics: false, } .run(); } @@ -1719,9 +1918,15 @@ async fn test_utf8() { #[tokio::test] async fn test_missing_statistics() { - let row_per_group = 5; - let reader = - parquet_file_one_column_stats(0, 4, 7, row_per_group, EnabledStatistics::None); + let reader = Int64Case { + null_values: 0, + no_null_values_start: 4, + no_null_values_end: 7, + row_per_group: 5, + enable_stats: Some(EnabledStatistics::None), + ..Default::default() + } + .build(); Test { reader: &reader, @@ -1730,6 +1935,7 @@ async fn test_missing_statistics() { expected_null_counts: UInt64Array::from(vec![None]), expected_row_counts: UInt64Array::from(vec![3]), // stil has row count statistics column_name: "i64", + test_data_page_statistics: false, } .run(); } @@ -1751,6 +1957,7 @@ async fn test_column_not_found() { expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "not_a_column", + test_data_page_statistics: false, } .run_col_not_found(); } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 9546ab30c9e0..0434a271c32e 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -43,7 +43,7 @@ use datafusion::{ use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; use half::f16; use parquet::arrow::ArrowWriter; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; use tempfile::NamedTempFile; @@ -1349,6 +1349,7 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem let props = WriterProperties::builder() .set_max_row_group_size(row_per_group) .set_bloom_filter_enabled(true) + .set_statistics_enabled(EnabledStatistics::Page) .build(); let batches = create_data_batch(scenario);