diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 5df3b9e424d5..4204593eba96 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -395,15 +395,22 @@ pub async fn statistics_from_parquet_meta( ) -> Result { let row_groups_metadata = metadata.row_groups(); + let mut statistics = Statistics::new_unknown(&table_schema); let mut has_statistics = false; + let mut num_rows = 0_usize; + let mut total_byte_size = 0_usize; for row_group_meta in row_groups_metadata { - for column in row_group_meta.columns() { - if let Some(_) = column.statistics() { - has_statistics = true; - break; - } + num_rows += row_group_meta.num_rows() as usize; + total_byte_size += row_group_meta.total_byte_size() as usize; + + if !has_statistics { + row_group_meta.columns().iter().for_each(|column| { + has_statistics = column.statistics().is_some(); + }); } } + statistics.num_rows = Precision::Exact(num_rows); + statistics.total_byte_size = Precision::Exact(total_byte_size); let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( @@ -411,87 +418,40 @@ pub async fn statistics_from_parquet_meta( file_metadata.key_value_metadata(), )?; - let mut statistics = Statistics::new_unknown(&table_schema); - let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); - let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()]; - - // The num_rows needs to be calculated even when the statistics converter fails. - // This is due to the null counts being calculated based on the number of rows when the prerequisites is not met. - // Below the test read_merged_batches checks for this behavior. - let num_rows = row_groups_metadata - .iter() - .map(|rg| rg.num_rows() as usize) - .sum(); - statistics.num_rows = Precision::Exact(num_rows); - - let mut fields_iter = table_schema.fields().iter(); - let Some(first_field) = fields_iter.next() else { - return Ok(statistics); - }; - - let option_stats_converter; - match StatisticsConverter::try_new( - first_field.name(), - &file_schema, - file_metadata.schema_descr(), - ) { - Ok(sc) => { - option_stats_converter = Some(sc); - } - Err(e) => { - debug!("Failed to create statistics converter: {}", e); - option_stats_converter = None; - null_counts_array[0] = Precision::Exact(num_rows); - } - }; - - if option_stats_converter.is_some() { - let stats_converter = option_stats_converter.unwrap(); - let Some(total_byte_size_array) = - stats_converter.row_group_row_total_bytes(row_groups_metadata)? - else { - return Ok(statistics); - }; - let total_byte_size = sum(&total_byte_size_array).unwrap_or_default() as usize; - statistics.total_byte_size = Precision::Exact(total_byte_size); - - summarize_min_max_null_counts( - &mut min_accs, - &mut max_accs, - &mut null_counts_array, - 0, - num_rows, - &stats_converter, - row_groups_metadata, - )?; - } + statistics.column_statistics = if has_statistics { + let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); + let mut null_counts_array = + vec![Precision::Exact(0); table_schema.fields().len()]; - fields_iter.enumerate().for_each(|(idx, field)| { - match StatisticsConverter::try_new( - field.name(), - &file_schema, - file_metadata.schema_descr(), - ) { - Ok(stats_converter) => { - summarize_min_max_null_counts( - &mut min_accs, - &mut max_accs, - &mut null_counts_array, - idx + 1, - num_rows, - &stats_converter, - row_groups_metadata, - ) - .ok(); - } - Err(e) => { - debug!("Failed to create statistics converter: {}", e); - null_counts_array[idx + 1] = Precision::Exact(num_rows); - } - } - }); + table_schema + .fields() + .iter() + .enumerate() + .for_each(|(idx, field)| { + match StatisticsConverter::try_new( + field.name(), + &file_schema, + file_metadata.schema_descr(), + ) { + Ok(stats_converter) => { + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + idx, + num_rows, + &stats_converter, + row_groups_metadata, + ) + .ok(); + } + Err(e) => { + debug!("Failed to create statistics converter: {}", e); + null_counts_array[idx] = Precision::Exact(num_rows); + } + } + }); - statistics.column_statistics = if has_statistics { get_col_stats( &table_schema, null_counts_array, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 7046e61d75d4..3be060ce6180 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -884,58 +884,6 @@ impl<'a> StatisticsConverter<'a> { Ok(Some(builder.finish())) } - /// Returns a [`UInt64Array`] with total byte sizes for each row group - /// - /// # Return Value - /// - /// The returned array has no nulls, and has one value for each row group. - /// Each value is the total byte size of the row group. - /// - /// # Example - /// ```no_run - /// # use arrow::datatypes::Schema; - /// # use arrow_array::ArrayRef; - /// # use parquet::file::metadata::ParquetMetaData; - /// # use datafusion::datasource::physical_plan::parquet::StatisticsConverter; - /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() } - /// # fn get_arrow_schema() -> Schema { unimplemented!() } - /// // Given the metadata for a parquet file and the arrow schema - /// let metadata: ParquetMetaData = get_parquet_metadata(); - /// let arrow_schema: Schema = get_arrow_schema(); - /// let parquet_schema = metadata.file_metadata().schema_descr(); - /// // create a converter - /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema) - /// .unwrap(); - /// // get the row counts for each row group - /// let row_counts = converter.row_group_row_total_bytes(metadata - /// .row_groups() - /// .iter() - /// ); - /// ``` - pub fn row_group_row_total_bytes( - &self, - metadatas: I, - ) -> Result> - where - I: IntoIterator, - { - let Some(_) = self.parquet_index else { - return Ok(None); - }; - - let mut builder = UInt64Array::builder(10); - for metadata in metadatas.into_iter() { - let row_count = metadata.total_byte_size(); - let row_count: u64 = row_count.try_into().map_err(|e| { - internal_datafusion_err!( - "Parquet row count {row_count} too large to convert to u64: {e}" - ) - })?; - builder.append_value(row_count); - } - Ok(Some(builder.finish())) - } - /// Create a new `StatisticsConverter` to extract statistics for a column /// /// Note if there is no corresponding column in the parquet file, the returned