Skip to content

Commit

Permalink
fix none
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Jun 23, 2024
1 parent 8e69e62 commit 42fcbbd
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 136 deletions.
128 changes: 44 additions & 84 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,103 +395,63 @@ pub async fn statistics_from_parquet_meta(
) -> Result<Statistics> {
let row_groups_metadata = metadata.row_groups();

let mut statistics = Statistics::new_unknown(&table_schema);
let mut has_statistics = false;
let mut num_rows = 0_usize;
let mut total_byte_size = 0_usize;
for row_group_meta in row_groups_metadata {
for column in row_group_meta.columns() {
if let Some(_) = column.statistics() {
has_statistics = true;
break;
}
num_rows += row_group_meta.num_rows() as usize;
total_byte_size += row_group_meta.total_byte_size() as usize;

if !has_statistics {
row_group_meta.columns().iter().for_each(|column| {
has_statistics = column.statistics().is_some();
});
}
}
statistics.num_rows = Precision::Exact(num_rows);
statistics.total_byte_size = Precision::Exact(total_byte_size);

let file_metadata = metadata.file_metadata();
let file_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;

let mut statistics = Statistics::new_unknown(&table_schema);
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()];

// The num_rows needs to be calculated even when the statistics converter fails.
// This is due to the null counts being calculated based on the number of rows when the prerequisites is not met.
// Below the test read_merged_batches checks for this behavior.
let num_rows = row_groups_metadata
.iter()
.map(|rg| rg.num_rows() as usize)
.sum();
statistics.num_rows = Precision::Exact(num_rows);

let mut fields_iter = table_schema.fields().iter();
let Some(first_field) = fields_iter.next() else {
return Ok(statistics);
};

let option_stats_converter;
match StatisticsConverter::try_new(
first_field.name(),
&file_schema,
file_metadata.schema_descr(),
) {
Ok(sc) => {
option_stats_converter = Some(sc);
}
Err(e) => {
debug!("Failed to create statistics converter: {}", e);
option_stats_converter = None;
null_counts_array[0] = Precision::Exact(num_rows);
}
};

if option_stats_converter.is_some() {
let stats_converter = option_stats_converter.unwrap();
let Some(total_byte_size_array) =
stats_converter.row_group_row_total_bytes(row_groups_metadata)?
else {
return Ok(statistics);
};
let total_byte_size = sum(&total_byte_size_array).unwrap_or_default() as usize;
statistics.total_byte_size = Precision::Exact(total_byte_size);

summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
0,
num_rows,
&stats_converter,
row_groups_metadata,
)?;
}
statistics.column_statistics = if has_statistics {
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array =
vec![Precision::Exact(0); table_schema.fields().len()];

fields_iter.enumerate().for_each(|(idx, field)| {
match StatisticsConverter::try_new(
field.name(),
&file_schema,
file_metadata.schema_descr(),
) {
Ok(stats_converter) => {
summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
idx + 1,
num_rows,
&stats_converter,
row_groups_metadata,
)
.ok();
}
Err(e) => {
debug!("Failed to create statistics converter: {}", e);
null_counts_array[idx + 1] = Precision::Exact(num_rows);
}
}
});
table_schema
.fields()
.iter()
.enumerate()
.for_each(|(idx, field)| {
match StatisticsConverter::try_new(
field.name(),
&file_schema,
file_metadata.schema_descr(),
) {
Ok(stats_converter) => {
summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
idx,
num_rows,
&stats_converter,
row_groups_metadata,
)
.ok();
}
Err(e) => {
debug!("Failed to create statistics converter: {}", e);
null_counts_array[idx] = Precision::Exact(num_rows);
}
}
});

statistics.column_statistics = if has_statistics {
get_col_stats(
&table_schema,
null_counts_array,
Expand Down
52 changes: 0 additions & 52 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,58 +884,6 @@ impl<'a> StatisticsConverter<'a> {
Ok(Some(builder.finish()))
}

/// Returns a [`UInt64Array`] with total byte sizes for each row group
///
/// # Return Value
///
/// The returned array has no nulls, and has one value for each row group.
/// Each value is the total byte size of the row group.
///
/// # Example
/// ```no_run
/// # use arrow::datatypes::Schema;
/// # use arrow_array::ArrayRef;
/// # use parquet::file::metadata::ParquetMetaData;
/// # use datafusion::datasource::physical_plan::parquet::StatisticsConverter;
/// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
/// # fn get_arrow_schema() -> Schema { unimplemented!() }
/// // Given the metadata for a parquet file and the arrow schema
/// let metadata: ParquetMetaData = get_parquet_metadata();
/// let arrow_schema: Schema = get_arrow_schema();
/// let parquet_schema = metadata.file_metadata().schema_descr();
/// // create a converter
/// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
/// .unwrap();
/// // get the row counts for each row group
/// let row_counts = converter.row_group_row_total_bytes(metadata
/// .row_groups()
/// .iter()
/// );
/// ```
pub fn row_group_row_total_bytes<I>(
&self,
metadatas: I,
) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(_) = self.parquet_index else {
return Ok(None);
};

let mut builder = UInt64Array::builder(10);
for metadata in metadatas.into_iter() {
let row_count = metadata.total_byte_size();
let row_count: u64 = row_count.try_into().map_err(|e| {
internal_datafusion_err!(
"Parquet row count {row_count} too large to convert to u64: {e}"
)
})?;
builder.append_value(row_count);
}
Ok(Some(builder.finish()))
}

/// Create a new `StatisticsConverter` to extract statistics for a column
///
/// Note if there is no corresponding column in the parquet file, the returned
Expand Down

0 comments on commit 42fcbbd

Please sign in to comment.