Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ListingTable to use StatisticsConverter #11068

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 127 additions & 150 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,20 @@ use std::sync::Arc;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{
Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
Statistics,
};

use arrow::compute::sum;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::stats::Precision;
Expand All @@ -52,11 +47,13 @@ use datafusion_common::{
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use log::debug;
use object_store::buffered::BufWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
Expand All @@ -66,16 +63,17 @@ use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use crate::datasource::physical_plan::parquet::{
ParquetExecBuilder, StatisticsConverter,
};
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
Expand Down Expand Up @@ -295,86 +293,6 @@ impl FileFormat for ParquetFormat {
}
}

fn summarize_min_max(
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
fields: &Fields,
i: usize,
stat: &ParquetStatistics,
) {
if !stat.has_min_max_set() {
max_values[i] = None;
min_values[i] = None;
return;
}
match stat {
ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
_ => {
max_values[i] = None;
min_values[i] = None;
}
}
}

/// Fetches parquet metadata from ObjectStore for given object
///
/// This component is a subject to **change** in near future and is exposed for low level integrations
Expand Down Expand Up @@ -467,7 +385,7 @@ async fn fetch_statistics(
statistics_from_parquet_meta(&metadata, table_schema).await
}

/// Convert statistics in [`ParquetMetaData`] into [`Statistics`]
/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using ['StatisticsConverter`]
///
/// The statistics are calculated for each column in the table schema
/// using the row group statistics in the parquet metadata.
Expand All @@ -476,77 +394,129 @@ pub async fn statistics_from_parquet_meta(
table_schema: SchemaRef,
) -> Result<Statistics> {
let file_metadata = metadata.file_metadata();

let file_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;

let num_fields = table_schema.fields().len();
let fields = table_schema.fields();
let mut statistics = Statistics::new_unknown(&table_schema);
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()];
let row_groups_metadata = metadata.row_groups();

// The num_rows needs to be calculated even when the statistics converter fails.
// This is due to the null counts being calculated based on the number of rows when the prerequisites is not met.
// Below the test read_merged_batches checks for this behavior.
let num_rows = row_groups_metadata
.iter()
.map(|rg| rg.num_rows() as usize)
.sum();
statistics.num_rows = Precision::Exact(num_rows);

let mut fields_iter = table_schema.fields().iter();
let Some(first_field) = fields_iter.next() else {
return Ok(statistics);
};

let mut num_rows = 0;
let mut total_byte_size = 0;
let mut null_counts = vec![Precision::Exact(0); num_fields];
let mut has_statistics = false;
let option_stats_converter;
match StatisticsConverter::try_new(
first_field.name(),
&file_schema,
file_metadata.schema_descr(),
) {
Ok(sc) => {
option_stats_converter = Some(sc);
}
Err(e) => {
debug!("Failed to create statistics converter: {}", e);
option_stats_converter = None;
null_counts_array[0] = Precision::Exact(num_rows);
}
};

let schema_adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
if option_stats_converter.is_some() {
let stats_converter = option_stats_converter.unwrap();
let Some(total_byte_size_array) =
stats_converter.row_group_row_total_bytes(row_groups_metadata)?
else {
return Ok(statistics);
};
let total_byte_size = sum(&total_byte_size_array).unwrap_or_default() as usize;
statistics.total_byte_size = Precision::Exact(total_byte_size);

summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
0,
num_rows,
&stats_converter,
row_groups_metadata,
)?;
}

let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
fields_iter.enumerate().for_each(|(idx, field)| {
match StatisticsConverter::try_new(
field.name(),
&file_schema,
file_metadata.schema_descr(),
) {
Ok(stats_converter) => {
summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
idx + 1,
num_rows,
&stats_converter,
row_groups_metadata,
)
.ok();
}
Err(e) => {
debug!("Failed to create statistics converter: {}", e);
null_counts_array[idx + 1] = Precision::Exact(num_rows);
}
}
});

for row_group_meta in metadata.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();
statistics.column_statistics = get_col_stats(
&table_schema,
null_counts_array,
&mut max_accs,
&mut min_accs,
);

let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new();
Ok(statistics)
}

for (i, column) in row_group_meta.columns().iter().enumerate() {
if let Some(stat) = column.statistics() {
has_statistics = true;
column_stats.insert(i, (stat.null_count(), stat));
}
}
fn summarize_min_max_null_counts(
min_accs: &mut [Option<MinAccumulator>],
max_accs: &mut [Option<MaxAccumulator>],
null_counts_array: &mut [Precision<usize>],
arrow_schema_index: usize,
num_rows: usize,
stats_converter: &StatisticsConverter,
row_groups_metadata: &[RowGroupMetaData],
) -> Result<()> {
let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;

if has_statistics {
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
if let Some(file_idx) =
schema_adapter.map_column_index(table_idx, &file_schema)
{
if let Some((null_count, stats)) = column_stats.get(&file_idx) {
*null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize));
summarize_min_max(
&mut max_values,
&mut min_values,
fields,
table_idx,
stats,
)
} else {
// If none statistics of current column exists, set the Max/Min Accumulator to None.
max_values[table_idx] = None;
min_values[table_idx] = None;
}
} else {
*null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize));
}
}
}
if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
max_acc.update_batch(&[max_values])?;
}

let column_stats = if has_statistics {
get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values)
} else {
Statistics::unknown_column(&table_schema)
};
if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
min_acc.update_batch(&[min_values])?;
}

let statistics = Statistics {
num_rows: Precision::Exact(num_rows as usize),
total_byte_size: Precision::Exact(total_byte_size as usize),
column_statistics: column_stats,
};
null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) {
Some(null_count) => null_count as usize,
None => num_rows,
});

Ok(statistics)
Ok(())
}

/// Implements [`DataSink`] for writing to a parquet file.
Expand Down Expand Up @@ -1126,7 +1096,8 @@ mod tests {
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_schema::Field;
use arrow_array::Int64Array;
use arrow_schema::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
Expand Down Expand Up @@ -1449,8 +1420,14 @@ mod tests {
// column c1
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c1_stats.max_value, Precision::Absent);
assert_eq!(c1_stats.min_value, Precision::Absent);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test behaviour changes due to Utf8 is not supported in

fn summarize_min_max(
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
fields: &Fields,
i: usize,
stat: &ParquetStatistics,
) {
if !stat.has_min_max_set() {
max_values[i] = None;
min_values[i] = None;
return;
}
match stat {
ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
_ => {
max_values[i] = None;
min_values[i] = None;
}
}
}

Copy link
Contributor

@alamb alamb Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an improvement!

assert_eq!(
c1_stats.max_value,
Precision::Exact(ScalarValue::Utf8(Some("bar".to_string())))
);
assert_eq!(
c1_stats.min_value,
Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string())))
);
// column c2: missing from the file so the table treats all 3 rows as null
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod page_filter;
mod reader;
mod row_filter;
mod row_groups;
mod statistics;
pub mod statistics;
mod writer;

use crate::datasource::schema_adapter::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ use std::collections::HashSet;
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::statistics::{
from_bytes_to_i128, parquet_column,
};
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
use super::statistics::parquet_column;

/// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`]
/// based on parquet page level statistics, if any
Expand Down
Loading
Loading