Skip to content

Commit

Permalink
fix misc
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Jun 23, 2024
1 parent ab96a17 commit 8e69e62
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 11 deletions.
29 changes: 22 additions & 7 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,18 @@ pub async fn statistics_from_parquet_meta(
metadata: &ParquetMetaData,
table_schema: SchemaRef,
) -> Result<Statistics> {
let row_groups_metadata = metadata.row_groups();

let mut has_statistics = false;
for row_group_meta in row_groups_metadata {
for column in row_group_meta.columns() {
if let Some(_) = column.statistics() {
has_statistics = true;
break;
}
}
}

let file_metadata = metadata.file_metadata();
let file_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
Expand All @@ -402,7 +414,6 @@ pub async fn statistics_from_parquet_meta(
let mut statistics = Statistics::new_unknown(&table_schema);
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()];
let row_groups_metadata = metadata.row_groups();

// The num_rows needs to be calculated even when the statistics converter fails.
// This is due to the null counts being calculated based on the number of rows when the prerequisites is not met.
Expand Down Expand Up @@ -480,12 +491,16 @@ pub async fn statistics_from_parquet_meta(
}
});

statistics.column_statistics = get_col_stats(
&table_schema,
null_counts_array,
&mut max_accs,
&mut min_accs,
);
statistics.column_statistics = if has_statistics {
get_col_stats(
&table_schema,
null_counts_array,
&mut max_accs,
&mut min_accs,
)
} else {
Statistics::unknown_column(&table_schema)
};

Ok(statistics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod page_filter;
mod reader;
mod row_filter;
mod row_groups;
pub mod statistics;
mod statistics;
mod writer;

use crate::datasource::schema_adapter::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ use std::collections::HashSet;
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::datasource::physical_plan::parquet::statistics::{
from_bytes_to_i128, parquet_column,
};
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
use super::statistics::parquet_column;

/// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`]
/// based on parquet page level statistics, if any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
// Copy from arrow-rs
// https://github.com/apache/arrow-rs/blob/198af7a3f4aa20f9bd003209d9f04b0f37bb120e/parquet/src/arrow/buffer/bit_util.rs#L54
// Convert the byte slice to fixed length byte array with the length of N.
fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
assert!(b.len() <= N, "Array too large, expected less than {N}");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
Expand Down Expand Up @@ -867,6 +867,10 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(_) = self.parquet_index else {
return Ok(None);
};

let mut builder = UInt64Array::builder(10);
for metadata in metadatas.into_iter() {
let row_count = metadata.num_rows();
Expand Down

0 comments on commit 8e69e62

Please sign in to comment.