Skip to content

Commit

Permalink
Update ListingTable to use StatisticsConverter (#11068)
Browse files Browse the repository at this point in the history
* Update ListingTable to use StatisticsConverter

* complete support for all types parquet

* Fix misc

* Fix misc

* fix test

* fix misc

* fix none
  • Loading branch information
xinlifoobar authored Jun 24, 2024
1 parent ed7c884 commit 459afbb
Showing 1 changed file with 103 additions and 151 deletions.
254 changes: 103 additions & 151 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,20 @@ use std::sync::Arc;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{
Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
Statistics,
};

use arrow::compute::sum;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::stats::Precision;
Expand All @@ -52,11 +47,13 @@ use datafusion_common::{
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use log::debug;
use object_store::buffered::BufWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
Expand All @@ -66,16 +63,17 @@ use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use crate::datasource::physical_plan::parquet::{
ParquetExecBuilder, StatisticsConverter,
};
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
Expand Down Expand Up @@ -295,86 +293,6 @@ impl FileFormat for ParquetFormat {
}
}

fn summarize_min_max(
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
fields: &Fields,
i: usize,
stat: &ParquetStatistics,
) {
if !stat.has_min_max_set() {
max_values[i] = None;
min_values[i] = None;
return;
}
match stat {
ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
_ => {
max_values[i] = None;
min_values[i] = None;
}
}
}

/// Fetches parquet metadata from ObjectStore for given object
///
/// This component is a subject to **change** in near future and is exposed for low level integrations
Expand Down Expand Up @@ -467,88 +385,115 @@ async fn fetch_statistics(
statistics_from_parquet_meta(&metadata, table_schema).await
}

/// Convert statistics in [`ParquetMetaData`] into [`Statistics`]
/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using ['StatisticsConverter`]
///
/// The statistics are calculated for each column in the table schema
/// using the row group statistics in the parquet metadata.
pub async fn statistics_from_parquet_meta(
metadata: &ParquetMetaData,
table_schema: SchemaRef,
) -> Result<Statistics> {
let file_metadata = metadata.file_metadata();
let row_groups_metadata = metadata.row_groups();

let mut statistics = Statistics::new_unknown(&table_schema);
let mut has_statistics = false;
let mut num_rows = 0_usize;
let mut total_byte_size = 0_usize;
for row_group_meta in row_groups_metadata {
num_rows += row_group_meta.num_rows() as usize;
total_byte_size += row_group_meta.total_byte_size() as usize;

if !has_statistics {
row_group_meta.columns().iter().for_each(|column| {
has_statistics = column.statistics().is_some();
});
}
}
statistics.num_rows = Precision::Exact(num_rows);
statistics.total_byte_size = Precision::Exact(total_byte_size);

let file_metadata = metadata.file_metadata();
let file_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;

let num_fields = table_schema.fields().len();
let fields = table_schema.fields();

let mut num_rows = 0;
let mut total_byte_size = 0;
let mut null_counts = vec![Precision::Exact(0); num_fields];
let mut has_statistics = false;

let schema_adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
statistics.column_statistics = if has_statistics {
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array =
vec![Precision::Exact(0); table_schema.fields().len()];

let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);

for row_group_meta in metadata.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();

let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new();

for (i, column) in row_group_meta.columns().iter().enumerate() {
if let Some(stat) = column.statistics() {
has_statistics = true;
column_stats.insert(i, (stat.null_count(), stat));
}
}

if has_statistics {
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
if let Some(file_idx) =
schema_adapter.map_column_index(table_idx, &file_schema)
{
if let Some((null_count, stats)) = column_stats.get(&file_idx) {
*null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize));
summarize_min_max(
&mut max_values,
&mut min_values,
fields,
table_idx,
stats,
table_schema
.fields()
.iter()
.enumerate()
.for_each(|(idx, field)| {
match StatisticsConverter::try_new(
field.name(),
&file_schema,
file_metadata.schema_descr(),
) {
Ok(stats_converter) => {
summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
idx,
num_rows,
&stats_converter,
row_groups_metadata,
)
} else {
// If none statistics of current column exists, set the Max/Min Accumulator to None.
max_values[table_idx] = None;
min_values[table_idx] = None;
.ok();
}
Err(e) => {
debug!("Failed to create statistics converter: {}", e);
null_counts_array[idx] = Precision::Exact(num_rows);
}
} else {
*null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize));
}
}
}
}
});

let column_stats = if has_statistics {
get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values)
get_col_stats(
&table_schema,
null_counts_array,
&mut max_accs,
&mut min_accs,
)
} else {
Statistics::unknown_column(&table_schema)
};

let statistics = Statistics {
num_rows: Precision::Exact(num_rows as usize),
total_byte_size: Precision::Exact(total_byte_size as usize),
column_statistics: column_stats,
};

Ok(statistics)
}

fn summarize_min_max_null_counts(
min_accs: &mut [Option<MinAccumulator>],
max_accs: &mut [Option<MaxAccumulator>],
null_counts_array: &mut [Precision<usize>],
arrow_schema_index: usize,
num_rows: usize,
stats_converter: &StatisticsConverter,
row_groups_metadata: &[RowGroupMetaData],
) -> Result<()> {
let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;

if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
max_acc.update_batch(&[max_values])?;
}

if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
min_acc.update_batch(&[min_values])?;
}

null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) {
Some(null_count) => null_count as usize,
None => num_rows,
});

Ok(())
}

/// Implements [`DataSink`] for writing to a parquet file.
pub struct ParquetSink {
/// Config options for writing data
Expand Down Expand Up @@ -1126,7 +1071,8 @@ mod tests {
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_schema::Field;
use arrow_array::Int64Array;
use arrow_schema::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
Expand Down Expand Up @@ -1449,8 +1395,14 @@ mod tests {
// column c1
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c1_stats.max_value, Precision::Absent);
assert_eq!(c1_stats.min_value, Precision::Absent);
assert_eq!(
c1_stats.max_value,
Precision::Exact(ScalarValue::Utf8(Some("bar".to_string())))
);
assert_eq!(
c1_stats.min_value,
Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string())))
);
// column c2: missing from the file so the table treats all 3 rows as null
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(3));
Expand Down

0 comments on commit 459afbb

Please sign in to comment.