Skip to content

Commit

Permalink
Implement the contained method of RowGroupPruningStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed Dec 28, 2023
1 parent 6403222 commit 21e7242
Showing 1 changed file with 67 additions and 6 deletions.
73 changes: 67 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use arrow::compute::{can_cast_types, cast_with_options, CastOptions};
use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::{Column, ScalarValue};
use arrow_array::{Array, BooleanArray};
use arrow_schema::{DataType, FieldRef};
use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;
use datafusion_common::{Column, DataFusionError, ScalarValue};
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
Expand Down Expand Up @@ -276,15 +278,74 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
scalar.to_array().ok()
}

/// The basic idea is to check whether all of the `values` are not within the min-max boundary.
/// If any one value is within the min-max boundary, then this row group will not be skipped.
/// Otherwise, this row group will be able to be skipped.
fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
let min_values = self.min_values(column)?;
let max_values = self.max_values(column)?;
// The boundary should be with length of 1
if min_values.len() != max_values.len() || min_values.len() != 1 {
return None;
}
let min_value = ScalarValue::try_from_array(min_values.as_ref(), 0).ok()?;
let max_value = ScalarValue::try_from_array(max_values.as_ref(), 0).ok()?;

// The boundary should be with the same data type
if min_value.data_type() != max_value.data_type() {
return None;
}
let target_data_type = min_value.data_type();

let (c, _) = self.column(&column.name)?;
let has_null = c.statistics()?.null_count() > 0;
let mut known_not_present = true;
for value in values {
// If it's null, check whether the null exists from the statistics
if has_null && value.is_null() {
known_not_present = false;
break;
}
// The filter values should be cast to the boundary's data type
if !can_cast_types(&value.data_type(), &target_data_type) {
return None;
}
let value =
cast_scalar_value(value, &target_data_type, &DEFAULT_CAST_OPTIONS)
.ok()?;

// If the filter value is within the boundary, will not be able to filter out this row group
if value >= min_value && value <= max_value {
known_not_present = false;
break;
}
}

let contains = if known_not_present { Some(false) } else { None };

Some(BooleanArray::from(vec![contains]))
}
}

const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
safe: false,
format_options: DEFAULT_FORMAT_OPTIONS,
};

/// Cast scalar value to the given data type using an arrow kernel.
fn cast_scalar_value(
value: &ScalarValue,
data_type: &DataType,
cast_options: &CastOptions,
) -> Result<ScalarValue, DataFusionError> {
let cast_array = cast_with_options(&value.to_array()?, data_type, cast_options)?;
ScalarValue::try_from_array(&cast_array, 0)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 21e7242

Please sign in to comment.