diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index a4a919f20d0f..c0d36f1fc4d7 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`min_statistics`] and [`max_statistics`] convert statistics in parquet format to arrow [`ArrayRef`]. +//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`]. // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 @@ -542,8 +542,11 @@ pub(crate) fn parquet_column<'a>( Some((parquet_idx, field)) } -/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] -pub(crate) fn min_statistics<'a, I: Iterator>>( +/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an +/// [`ArrayRef`] +/// +/// This is an internal helper -- see [`StatisticsConverter`] for public API +fn min_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, ) -> Result { @@ -551,7 +554,9 @@ pub(crate) fn min_statistics<'a, I: Iterator>>( +/// +/// This is an internal helper -- see [`StatisticsConverter`] for public API +fn max_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, ) -> Result { @@ -1425,9 +1430,10 @@ mod test { assert_eq!(idx, 2); let row_groups = metadata.row_groups(); - let iter = row_groups.iter().map(|x| x.column(idx).statistics()); + let converter = + StatisticsConverter::try_new("int_col", &schema, parquet_schema).unwrap(); - let min = min_statistics(&DataType::Int32, iter.clone()).unwrap(); + let min = converter.row_group_mins(row_groups.iter()).unwrap(); assert_eq!( &min, &expected_min, @@ -1435,7 +1441,7 @@ mod test { DisplayStats(row_groups) ); - let max = max_statistics(&DataType::Int32, iter).unwrap(); + let max = converter.row_group_maxes(row_groups.iter()).unwrap(); assert_eq!( &max, &expected_max, @@ -1623,22 +1629,23 @@ mod test { continue; } - let (idx, f) = - parquet_column(parquet_schema, &schema, field.name()).unwrap(); - assert_eq!(f, field); + let converter = + StatisticsConverter::try_new(field.name(), &schema, parquet_schema) + .unwrap(); - let iter = row_groups.iter().map(|x| x.column(idx).statistics()); - let min = min_statistics(f.data_type(), iter.clone()).unwrap(); + assert_eq!(converter.arrow_field, field.as_ref()); + + let mins = converter.row_group_mins(row_groups.iter()).unwrap(); assert_eq!( - &min, + &mins, &expected_min, "Min. Statistics\n\n{}\n\n", DisplayStats(row_groups) ); - let max = max_statistics(f.data_type(), iter).unwrap(); + let maxes = converter.row_group_maxes(row_groups.iter()).unwrap(); assert_eq!( - &max, + &maxes, &expected_max, "Max. Statistics\n\n{}\n\n", DisplayStats(row_groups) @@ -1705,7 +1712,7 @@ mod test { self } - /// Reads the specified parquet file and validates that the exepcted min/max + /// Reads the specified parquet file and validates that the expected min/max /// values for the specified columns are as expected. fn run(self) { let path = PathBuf::from(parquet_test_data()).join(self.file_name); @@ -1723,14 +1730,13 @@ mod test { expected_max, } = expected_column; - let (idx, field) = - parquet_column(parquet_schema, arrow_schema, name).unwrap(); - - let iter = row_groups.iter().map(|x| x.column(idx).statistics()); - let actual_min = min_statistics(field.data_type(), iter.clone()).unwrap(); + let converter = + StatisticsConverter::try_new(name, arrow_schema, parquet_schema) + .unwrap(); + let actual_min = converter.row_group_mins(row_groups.iter()).unwrap(); assert_eq!(&expected_min, &actual_min, "column {name}"); - let actual_max = max_statistics(field.data_type(), iter).unwrap(); + let actual_max = converter.row_group_maxes(row_groups.iter()).unwrap(); assert_eq!(&expected_max, &actual_max, "column {name}"); } }