Skip to content

Commit

Permalink
feat: Add support for Int8 and Int16 data types in data page statisti…
Browse files Browse the repository at this point in the history
…cs (#10931)
  • Loading branch information
Weijun-H authored Jun 17, 2024
1 parent c4fd754 commit a923c65
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
30 changes: 30 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,38 @@ macro_rules! make_data_page_stats_iterator {
};
}

make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min, Index::INT32, i32);
make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max, Index::INT32, i32);
make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Int8) => Ok(Arc::new(
Int8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| i8::try_from(x).ok())
})
})
.flatten()
)
)),
Some(DataType::Int16) => Ok(Arc::new(
Int16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| i16::try_from(x).ok())
})
})
.flatten()
)
)),
Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
}
Expand Down Expand Up @@ -642,6 +667,11 @@ where
{
let iter = iterator.flat_map(|(len, index)| match index {
Index::NONE => vec![None; len],
Index::INT32(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::INT64(native_index) => native_index
.indexes
.iter()
Expand Down
24 changes: 3 additions & 21 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,16 +550,11 @@ async fn test_int_32() {
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
column_name: "i32",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}

// BUG: ignore this test for now
// https://github.com/apache/datafusion/issues/10585
// Note that the file has 4 columns named "i8", "i16", "i32", "i64".
// - The tests on column i32 and i64 passed.
// - The tests on column i8 and i16 failed.
#[tokio::test]
async fn test_int_16() {
// This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64"
Expand All @@ -573,16 +568,6 @@ async fn test_int_16() {
Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
// BUG: not sure why this returns same data but in Int32Array type even though I debugged and the columns name is "i16" an its data is Int16
// My debugging tells me the bug is either at:
// 1. The new code to get "iter". See the code in this PR with
// // Get an iterator over the column statistics
// let iter = row_groups
// .iter()
// .map(|x| x.column(parquet_idx).statistics());
// OR
// 2. in the function (and/or its marco) `pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>` here
// https://github.com/apache/datafusion/blob/ea023e2d4878240eece870cf4b346c7a0667aeed/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L179
expected_min: Arc::new(Int16Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array
// maxes are [-1, 0, 4, 9]
expected_max: Arc::new(Int16Array::from(vec![-1, 0, 4, 9])),
Expand All @@ -591,13 +576,11 @@ async fn test_int_16() {
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
column_name: "i16",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}

// BUG (same as above): ignore this test for now
// https://github.com/apache/datafusion/issues/10585
#[tokio::test]
async fn test_int_8() {
// This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64"
Expand All @@ -611,7 +594,6 @@ async fn test_int_8() {
Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
// BUG: not sure why this returns same data but in Int32Array even though I debugged and the columns name is "i8" an its data is Int8
expected_min: Arc::new(Int8Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array
// maxes are [-1, 0, 4, 9]
expected_max: Arc::new(Int8Array::from(vec![-1, 0, 4, 9])),
Expand All @@ -620,7 +602,7 @@ async fn test_int_8() {
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
column_name: "i8",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down

0 comments on commit a923c65

Please sign in to comment.