Skip to content

Commit 2d70413

Browse files
committed
Write null counts in parquet files when they are present
1 parent 69b17ad commit 2d70413

File tree

4 files changed

+252
-54
lines changed

4 files changed

+252
-54
lines changed

parquet/src/arrow/arrow_reader/statistics.rs

+26-1
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,8 @@ pub struct StatisticsConverter<'a> {
11791179
parquet_column_index: Option<usize>,
11801180
/// The field (with data type) of the column in the Arrow schema
11811181
arrow_field: &'a Field,
1182+
/// treat missing null_counts as 0 nulls
1183+
missing_null_counts_as_zero: bool,
11821184
}
11831185

11841186
impl<'a> StatisticsConverter<'a> {
@@ -1195,6 +1197,18 @@ impl<'a> StatisticsConverter<'a> {
11951197
self.arrow_field
11961198
}
11971199

1200+
/// Set the statistics converter to treat missing null counts as missing
1201+
///
1202+
/// By default, the converter will treat missing null counts as 0 nulls.
1203+
///
1204+
/// Due to <https://github.com/apache/arrow-rs/pull/6257>, prior to version
1205+
/// 53.0.0, parquet files written by parquet-rs did not store null counts
1206+
/// when there were zero nulls.
1207+
pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1208+
self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1209+
self
1210+
}
1211+
11981212
/// Returns a [`UInt64Array`] with row counts for each row group
11991213
///
12001214
/// # Return Value
@@ -1288,6 +1302,7 @@ impl<'a> StatisticsConverter<'a> {
12881302
Ok(Self {
12891303
parquet_column_index: parquet_index,
12901304
arrow_field,
1305+
missing_null_counts_as_zero: true,
12911306
})
12921307
}
12931308

@@ -1386,7 +1401,15 @@ impl<'a> StatisticsConverter<'a> {
13861401
let null_counts = metadatas
13871402
.into_iter()
13881403
.map(|x| x.column(parquet_index).statistics())
1389-
.map(|s| s.and_then(|s| s.null_count_opt()));
1404+
.map(|s| {
1405+
s.and_then(|s| {
1406+
if self.missing_null_counts_as_zero {
1407+
Some(s.null_count_opt().unwrap_or(0))
1408+
} else {
1409+
s.null_count_opt()
1410+
}
1411+
})
1412+
});
13901413
Ok(UInt64Array::from_iter(null_counts))
13911414
}
13921415

@@ -1597,3 +1620,5 @@ impl<'a> StatisticsConverter<'a> {
15971620
new_null_array(data_type, num_row_groups)
15981621
}
15991622
}
1623+
1624+
// See tests in parquet/tests/arrow_reader/statistics.rs

parquet/src/file/statistics.rs

+136-24
Original file line numberDiff line numberDiff line change
@@ -125,22 +125,35 @@ pub fn from_thrift(
125125
) -> Result<Option<Statistics>> {
126126
Ok(match thrift_stats {
127127
Some(stats) => {
128-
// Number of nulls recorded, when it is not available, we just mark it as 0.
129-
// TODO this should be `None` if there is no information about NULLS.
130-
// see https://github.com/apache/arrow-rs/pull/6216/files
131-
let null_count = stats.null_count.unwrap_or(0);
132-
133-
if null_count < 0 {
134-
return Err(ParquetError::General(format!(
135-
"Statistics null count is negative {}",
136-
null_count
137-
)));
138-
}
128+
// transform null count to u64
129+
let null_count = stats
130+
.null_count
131+
.map(|null_count| {
132+
if null_count < 0 {
133+
return Err(ParquetError::General(format!(
134+
"Statistics null count is negative {}",
135+
null_count
136+
)));
137+
}
138+
Ok(null_count as u64)
139+
})
140+
.transpose()?;
139141

140-
// Generic null count.
141-
let null_count = Some(null_count as u64);
142142
// Generic distinct count (count of distinct values occurring)
143-
let distinct_count = stats.distinct_count.map(|value| value as u64);
143+
let distinct_count = stats
144+
.distinct_count
145+
.map(|distinct_count| {
146+
if distinct_count < 0 {
147+
return Err(ParquetError::General(format!(
148+
"Statistics distinct count is negative {}",
149+
distinct_count
150+
)));
151+
}
152+
153+
Ok(distinct_count as u64)
154+
})
155+
.transpose()?;
156+
144157
// Whether or not statistics use deprecated min/max fields.
145158
let old_format = stats.min_value.is_none() && stats.max_value.is_none();
146159
// Generic min value as bytes.
@@ -244,20 +257,21 @@ pub fn from_thrift(
244257
pub fn to_thrift(stats: Option<&Statistics>) -> Option<TStatistics> {
245258
let stats = stats?;
246259

247-
// record null counts if greater than zero.
248-
//
249-
// TODO: This should be Some(0) if there are no nulls.
250-
// see https://github.com/apache/arrow-rs/pull/6216/files
260+
// record null count if it can fit in i64
251261
let null_count = stats
252262
.null_count_opt()
253-
.map(|value| value as i64)
254-
.filter(|&x| x > 0);
263+
.and_then(|value| i64::try_from(value).ok());
264+
265+
// record distinct count if it can fit in i64
266+
let distinct_count = stats
267+
.distinct_count()
268+
.and_then(|value| i64::try_from(value).ok());
255269

256270
let mut thrift_stats = TStatistics {
257271
max: None,
258272
min: None,
259273
null_count,
260-
distinct_count: stats.distinct_count().map(|value| value as i64),
274+
distinct_count,
261275
max_value: None,
262276
min_value: None,
263277
is_max_value_exact: None,
@@ -404,9 +418,20 @@ impl Statistics {
404418
/// Returns number of null values for the column, if known.
405419
/// Note that this includes all nulls when column is part of the complex type.
406420
///
407-
/// Note this API returns Some(0) even if the null count was not present
408-
/// in the statistics.
409-
/// See <https://github.com/apache/arrow-rs/pull/6216/files>
421+
/// Note: Versions of this library prior to `53.0.0` returned 0 if the null count was
422+
/// not available. This method returns `None` in that case.
423+
///
424+
/// Also, versions of this library prior to `53.0.0` did not store the null count in the
425+
/// statistics if the null count was `0`.
426+
///
427+
/// To preserve the prior behavior and read null counts properly from older files
428+
/// you should default to zero:
429+
///
430+
/// ```no_run
431+
/// # use parquet::file::statistics::Statistics;
432+
/// # let statistics: Statistics = todo!();
433+
/// let null_count = statistics.null_count_opt().unwrap_or(0);
434+
/// ```
410435
pub fn null_count_opt(&self) -> Option<u64> {
411436
statistics_enum_func![self, null_count_opt]
412437
}
@@ -1041,4 +1066,91 @@ mod tests {
10411066
true,
10421067
));
10431068
}
1069+
1070+
#[test]
1071+
fn test_count_encoding() {
1072+
statistics_count_test(None, None);
1073+
statistics_count_test(Some(0), Some(0));
1074+
statistics_count_test(Some(100), Some(2000));
1075+
statistics_count_test(Some(1), None);
1076+
statistics_count_test(None, Some(1));
1077+
}
1078+
1079+
#[test]
1080+
fn test_count_encoding_distinct_too_large() {
1081+
// statistics are stored using i64, so test trying to store larger values
1082+
let statistics = make_bool_stats(Some(u64::MAX), Some(100));
1083+
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
1084+
assert_eq!(thrift_stats.distinct_count, None); // can't store u64 max --> null
1085+
assert_eq!(thrift_stats.null_count, Some(100));
1086+
}
1087+
1088+
#[test]
1089+
fn test_count_encoding_null_too_large() {
1090+
// statistics are stored using i64, so test trying to store larger values
1091+
let statistics = make_bool_stats(Some(100), Some(u64::MAX));
1092+
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
1093+
assert_eq!(thrift_stats.distinct_count, Some(100));
1094+
assert_eq!(thrift_stats.null_count, None); // can' store u64 max --> null
1095+
}
1096+
1097+
#[test]
1098+
fn test_count_decoding_distinct_invalid() {
1099+
let tstatistics = TStatistics {
1100+
distinct_count: Some(-42),
1101+
..Default::default()
1102+
};
1103+
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
1104+
assert_eq!(
1105+
err.to_string(),
1106+
"Parquet error: Statistics distinct count is negative -42"
1107+
);
1108+
}
1109+
1110+
#[test]
1111+
fn test_count_decoding_null_invalid() {
1112+
let tstatistics = TStatistics {
1113+
null_count: Some(-42),
1114+
..Default::default()
1115+
};
1116+
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
1117+
assert_eq!(
1118+
err.to_string(),
1119+
"Parquet error: Statistics null count is negative -42"
1120+
);
1121+
}
1122+
1123+
/// Writes statistics to thrift and reads them back and ensures:
1124+
/// - The statistics are the same
1125+
/// - The statistics written to thrift are the same as the original statistics
1126+
fn statistics_count_test(distinct_count: Option<u64>, null_count: Option<u64>) {
1127+
let statistics = make_bool_stats(distinct_count, null_count);
1128+
1129+
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
1130+
assert_eq!(thrift_stats.null_count.map(|c| c as u64), null_count);
1131+
assert_eq!(
1132+
thrift_stats.distinct_count.map(|c| c as u64),
1133+
distinct_count
1134+
);
1135+
1136+
let round_tripped = from_thrift(Type::BOOLEAN, Some(thrift_stats))
1137+
.unwrap()
1138+
.unwrap();
1139+
assert_eq!(round_tripped, statistics);
1140+
}
1141+
1142+
fn make_bool_stats(distinct_count: Option<u64>, null_count: Option<u64>) -> Statistics {
1143+
let min = Some(true);
1144+
let max = Some(false);
1145+
let is_min_max_deprecated = false;
1146+
1147+
// test is about the counts, so we aren't really testing the min/max values
1148+
Statistics::Boolean(ValueStatistics::new(
1149+
min,
1150+
max,
1151+
distinct_count,
1152+
null_count,
1153+
is_min_max_deprecated,
1154+
))
1155+
}
10441156
}

parquet/tests/arrow_reader/statistics.rs

+64-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::default::Default;
2222
use std::fs::File;
2323
use std::sync::Arc;
2424

25+
use super::make_test_file_rg;
2526
use super::{struct_array, Scenario};
2627
use arrow::compute::kernels::cast_utils::Parser;
2728
use arrow::datatypes::{
@@ -37,16 +38,17 @@ use arrow_array::{
3738
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
3839
UInt32Array, UInt64Array, UInt8Array,
3940
};
40-
use arrow_schema::{DataType, Field, Schema, TimeUnit};
41+
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
4142
use half::f16;
4243
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
4344
use parquet::arrow::arrow_reader::{
4445
ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
4546
};
4647
use parquet::arrow::ArrowWriter;
48+
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
4749
use parquet::file::properties::{EnabledStatistics, WriterProperties};
48-
49-
use super::make_test_file_rg;
50+
use parquet::file::statistics::{Statistics, ValueStatistics};
51+
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
5052

5153
#[derive(Debug, Default, Clone)]
5254
struct Int64Case {
@@ -2139,6 +2141,65 @@ async fn test_missing_statistics() {
21392141
.run();
21402142
}
21412143

2144+
#[test]
2145+
fn missing_null_counts_as_zero() {
2146+
let min = None;
2147+
let max = None;
2148+
let distinct_count = None;
2149+
let null_count = None; // NB: no null count
2150+
let is_min_max_deprecated = false;
2151+
let stats = Statistics::Boolean(ValueStatistics::new(
2152+
min,
2153+
max,
2154+
distinct_count,
2155+
null_count,
2156+
is_min_max_deprecated,
2157+
));
2158+
let (arrow_schema, parquet_schema) = bool_arrow_and_parquet_schema();
2159+
2160+
let column_chunk = ColumnChunkMetaData::builder(parquet_schema.column(0))
2161+
.set_statistics(stats)
2162+
.build()
2163+
.unwrap();
2164+
let metadata = RowGroupMetaData::builder(parquet_schema.clone())
2165+
.set_column_metadata(vec![column_chunk])
2166+
.build()
2167+
.unwrap();
2168+
2169+
let converter = StatisticsConverter::try_new("b", &arrow_schema, &parquet_schema).unwrap();
2170+
2171+
// by default null count should be 0
2172+
assert_eq!(
2173+
converter.row_group_null_counts([&metadata]).unwrap(),
2174+
UInt64Array::from_iter(vec![Some(0)])
2175+
);
2176+
2177+
// if we disable missing null counts as zero flag null count will be None
2178+
let converter = converter.with_missing_null_counts_as_zero(false);
2179+
assert_eq!(
2180+
converter.row_group_null_counts([&metadata]).unwrap(),
2181+
UInt64Array::from_iter(vec![None])
2182+
);
2183+
}
2184+
2185+
/// return an Arrow schema and corresponding Parquet SchemaDescriptor for
2186+
/// a schema with a single boolean column "b"
2187+
fn bool_arrow_and_parquet_schema() -> (SchemaRef, SchemaDescPtr) {
2188+
let arrow_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Boolean, true)]));
2189+
use parquet::schema::types::Type as ParquetType;
2190+
let parquet_schema = ParquetType::group_type_builder("schema")
2191+
.with_fields(vec![Arc::new(
2192+
ParquetType::primitive_type_builder("a", parquet::basic::Type::INT32)
2193+
.build()
2194+
.unwrap(),
2195+
)])
2196+
.build()
2197+
.unwrap();
2198+
2199+
let parquet_schema = Arc::new(SchemaDescriptor::new(Arc::new(parquet_schema)));
2200+
(arrow_schema, parquet_schema)
2201+
}
2202+
21422203
/////// NEGATIVE TESTS ///////
21432204
// column not found
21442205
#[tokio::test]

0 commit comments

Comments
 (0)