diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 572904254fd75..4204593eba96d 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -25,25 +25,20 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; use super::{FileFormat, FileScanConfig}; -use crate::arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, -}; -use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; +use crate::arrow::array::RecordBatch; +use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig}; -use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, -}; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics, }; +use arrow::compute::sum; use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; @@ -52,11 +47,13 @@ use datafusion_common::{ }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; +use log::debug; use object_store::buffered::BufWriter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, @@ -66,16 +63,17 @@ use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, }; use parquet::file::footer::{decode_footer, decode_metadata}; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; -use crate::datasource::physical_plan::parquet::ParquetExecBuilder; +use crate::datasource::physical_plan::parquet::{ + ParquetExecBuilder, StatisticsConverter, +}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; @@ -295,86 +293,6 @@ impl FileFormat for ParquetFormat { } } -fn summarize_min_max( - max_values: &mut [Option], - min_values: &mut [Option], - fields: &Fields, - i: usize, - stat: &ParquetStatistics, -) { - if !stat.has_min_max_set() { - max_values[i] = None; - min_values[i] = None; - return; - } - match stat { - ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - _ => { - max_values[i] = None; - min_values[i] = None; - } - } -} - /// Fetches parquet metadata from ObjectStore for given object /// /// This component is a subject to **change** in near future and is exposed for low level integrations @@ -467,7 +385,7 @@ async fn fetch_statistics( statistics_from_parquet_meta(&metadata, table_schema).await } -/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] +/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using ['StatisticsConverter`] /// /// The statistics are calculated for each column in the table schema /// using the row group statistics in the parquet metadata. @@ -475,80 +393,107 @@ pub async fn statistics_from_parquet_meta( metadata: &ParquetMetaData, table_schema: SchemaRef, ) -> Result { - let file_metadata = metadata.file_metadata(); + let row_groups_metadata = metadata.row_groups(); + let mut statistics = Statistics::new_unknown(&table_schema); + let mut has_statistics = false; + let mut num_rows = 0_usize; + let mut total_byte_size = 0_usize; + for row_group_meta in row_groups_metadata { + num_rows += row_group_meta.num_rows() as usize; + total_byte_size += row_group_meta.total_byte_size() as usize; + + if !has_statistics { + row_group_meta.columns().iter().for_each(|column| { + has_statistics = column.statistics().is_some(); + }); + } + } + statistics.num_rows = Precision::Exact(num_rows); + statistics.total_byte_size = Precision::Exact(total_byte_size); + + let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; - let num_fields = table_schema.fields().len(); - let fields = table_schema.fields(); - - let mut num_rows = 0; - let mut total_byte_size = 0; - let mut null_counts = vec![Precision::Exact(0); num_fields]; - let mut has_statistics = false; - - let schema_adapter = - DefaultSchemaAdapterFactory::default().create(table_schema.clone()); + statistics.column_statistics = if has_statistics { + let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); + let mut null_counts_array = + vec![Precision::Exact(0); table_schema.fields().len()]; - let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); - - for row_group_meta in metadata.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); - - let mut column_stats: HashMap = HashMap::new(); - - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - has_statistics = true; - column_stats.insert(i, (stat.null_count(), stat)); - } - } - - if has_statistics { - for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() { - if let Some(file_idx) = - schema_adapter.map_column_index(table_idx, &file_schema) - { - if let Some((null_count, stats)) = column_stats.get(&file_idx) { - *null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize)); - summarize_min_max( - &mut max_values, - &mut min_values, - fields, - table_idx, - stats, + table_schema + .fields() + .iter() + .enumerate() + .for_each(|(idx, field)| { + match StatisticsConverter::try_new( + field.name(), + &file_schema, + file_metadata.schema_descr(), + ) { + Ok(stats_converter) => { + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + idx, + num_rows, + &stats_converter, + row_groups_metadata, ) - } else { - // If none statistics of current column exists, set the Max/Min Accumulator to None. - max_values[table_idx] = None; - min_values[table_idx] = None; + .ok(); + } + Err(e) => { + debug!("Failed to create statistics converter: {}", e); + null_counts_array[idx] = Precision::Exact(num_rows); } - } else { - *null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize)); } - } - } - } + }); - let column_stats = if has_statistics { - get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values) + get_col_stats( + &table_schema, + null_counts_array, + &mut max_accs, + &mut min_accs, + ) } else { Statistics::unknown_column(&table_schema) }; - let statistics = Statistics { - num_rows: Precision::Exact(num_rows as usize), - total_byte_size: Precision::Exact(total_byte_size as usize), - column_statistics: column_stats, - }; - Ok(statistics) } +fn summarize_min_max_null_counts( + min_accs: &mut [Option], + max_accs: &mut [Option], + null_counts_array: &mut [Precision], + arrow_schema_index: usize, + num_rows: usize, + stats_converter: &StatisticsConverter, + row_groups_metadata: &[RowGroupMetaData], +) -> Result<()> { + let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; + let min_values = stats_converter.row_group_mins(row_groups_metadata)?; + let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; + + if let Some(max_acc) = &mut max_accs[arrow_schema_index] { + max_acc.update_batch(&[max_values])?; + } + + if let Some(min_acc) = &mut min_accs[arrow_schema_index] { + min_acc.update_batch(&[min_values])?; + } + + null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { + Some(null_count) => null_count as usize, + None => num_rows, + }); + + Ok(()) +} + /// Implements [`DataSink`] for writing to a parquet file. pub struct ParquetSink { /// Config options for writing data @@ -1126,7 +1071,8 @@ mod tests { use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow_schema::Field; + use arrow_array::Int64Array; + use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, @@ -1449,8 +1395,14 @@ mod tests { // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); - assert_eq!(c1_stats.max_value, Precision::Absent); - assert_eq!(c1_stats.min_value, Precision::Absent); + assert_eq!( + c1_stats.max_value, + Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) + ); + assert_eq!( + c1_stats.min_value, + Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) + ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; assert_eq!(c2_stats.null_count, Precision::Exact(3));