Skip to content

Commit

Permalink
Rewrite bloom filters to use contains API (#8442)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Dec 26, 2023
1 parent ec8fd44 commit e10d3e2
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ impl FileOpener for ParquetOpener {
if enable_bloom_filter && !row_groups.is_empty() {
if let Some(predicate) = predicate {
row_groups = row_groups::prune_row_groups_by_bloom_filters(
&file_schema,
&mut builder,
&row_groups,
file_metadata.row_groups(),
Expand Down
245 changes: 90 additions & 155 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,21 @@
use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
use datafusion_common::{Column, ScalarValue};
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
bloom_filter::Sbbf,
file::metadata::RowGroupMetaData,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::collections::{HashMap, HashSet};

use crate::datasource::listing::FileRange;
use crate::datasource::physical_plan::parquet::statistics::{
max_statistics, min_statistics, parquet_column,
};
use crate::logical_expr::Operator;
use crate::physical_expr::expressions as phys_expr;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::PhysicalExpr;

use super::ParquetFileMetrics;

Expand Down Expand Up @@ -118,188 +111,129 @@ pub(crate) fn prune_row_groups_by_statistics(
pub(crate) async fn prune_row_groups_by_bloom_filters<
T: AsyncFileReader + Send + 'static,
>(
arrow_schema: &Schema,
builder: &mut ParquetRecordBatchStreamBuilder<T>,
row_groups: &[usize],
groups: &[RowGroupMetaData],
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr())
{
Ok(predicates) => predicates,
Err(_) => {
return row_groups.to_vec();
}
};
let mut filtered = Vec::with_capacity(groups.len());
for idx in row_groups {
let rg_metadata = &groups[*idx];
// get all columns bloom filter
let mut column_sbbf =
HashMap::with_capacity(bf_predicates.required_columns.len());
for column_name in bf_predicates.required_columns.iter() {
let column_idx = match rg_metadata
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string().eq(column_name))
{
Some((column_idx, _)) => column_idx,
None => continue,
// get all columns in the predicate that we could use a bloom filter with
let literal_columns = predicate.literal_columns();
let mut column_sbbf = HashMap::with_capacity(literal_columns.len());

for column_name in literal_columns {
let Some((column_idx, _field)) =
parquet_column(builder.parquet_schema(), arrow_schema, &column_name)
else {
continue;
};

let bf = match builder
.get_row_group_column_bloom_filter(*idx, column_idx)
.await
{
Ok(bf) => match bf {
Some(bf) => bf,
None => {
continue;
}
},
Ok(Some(bf)) => bf,
Ok(None) => continue, // no bloom filter for this column
Err(e) => {
log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}");
log::debug!("Ignoring error reading bloom filter: {e}");
metrics.predicate_evaluation_errors.add(1);
continue;
}
};
column_sbbf.insert(column_name.to_owned(), bf);
column_sbbf.insert(column_name.to_string(), bf);
}
if bf_predicates.prune(&column_sbbf) {

let stats = BloomFilterStatistics { column_sbbf };

// Can this group be pruned?
let prune_group = match predicate.prune(&stats) {
Ok(values) => !values[0],
Err(e) => {
log::debug!("Error evaluating row group predicate on bloom filter: {e}");
metrics.predicate_evaluation_errors.add(1);
false
}
};

if prune_group {
metrics.row_groups_pruned.add(1);
continue;
} else {
filtered.push(*idx);
}
filtered.push(*idx);
}
filtered
}

struct BloomFilterPruningPredicate {
/// Actual pruning predicate
predicate_expr: Option<phys_expr::BinaryExpr>,
/// The statistics required to evaluate this predicate
required_columns: Vec<String>,
/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF)
struct BloomFilterStatistics {
/// Maps column name to the parquet bloom filter
column_sbbf: HashMap<String, Sbbf>,
}

impl BloomFilterPruningPredicate {
fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
let binary_expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
match binary_expr {
Some(binary_expr) => {
let columns = Self::get_predicate_columns(expr);
Ok(Self {
predicate_expr: Some(binary_expr.clone()),
required_columns: columns.into_iter().collect(),
})
}
None => Err(DataFusionError::Execution(
"BloomFilterPruningPredicate only support binary expr".to_string(),
)),
}
impl PruningStatistics for BloomFilterStatistics {
fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
None
}

fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf)
fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
None
}

/// Return true if the `expr` can be proved not `true`
/// based on the bloom filter.
///
/// We only checked `BinaryExpr` but it also support `InList`,
/// Because of the `optimizer` will convert `InList` to `BinaryExpr`.
fn prune_expr_with_bloom_filter(
expr: Option<&phys_expr::BinaryExpr>,
column_sbbf: &HashMap<String, Sbbf>,
) -> bool {
let Some(expr) = expr else {
// unsupported predicate
return false;
};
match expr.op() {
Operator::And | Operator::Or => {
let left = Self::prune_expr_with_bloom_filter(
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
let right = Self::prune_expr_with_bloom_filter(
expr.right()
.as_any()
.downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
match expr.op() {
Operator::And => left || right,
Operator::Or => left && right,
_ => false,
}
}
Operator::Eq => {
if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) {
if let Some(sbbf) = column_sbbf.get(col.name()) {
match val {
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
_ => false,
}
} else {
false
}
} else {
false
}
}
_ => false,
}
fn num_containers(&self) -> usize {
1
}

fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> {
let mut columns = HashSet::new();
expr.apply(&mut |expr| {
if let Some(binary_expr) =
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>()
{
if let Some((column, _)) =
Self::check_expr_is_col_equal_const(binary_expr)
{
columns.insert(column.name().to_string());
}
}
Ok(VisitRecursion::Continue)
})
// no way to fail as only Ok(VisitRecursion::Continue) is returned
.unwrap();

columns
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}

fn check_expr_is_col_equal_const(
exr: &phys_expr::BinaryExpr,
) -> Option<(phys_expr::Column, ScalarValue)> {
if Operator::Eq.ne(exr.op()) {
return None;
}
/// Use bloom filters to determine if we are sure this column can not
/// possibly contain `values`
///
/// The `contained` API returns false if the bloom filters knows that *ALL*
/// of the values in a column are not present.
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
let sbbf = self.column_sbbf.get(column.name.as_str())?;

let left_any = exr.left().as_any();
let right_any = exr.right().as_any();
if let (Some(col), Some(liter)) = (
left_any.downcast_ref::<phys_expr::Column>(),
right_any.downcast_ref::<phys_expr::Literal>(),
) {
return Some((col.clone(), liter.value().clone()));
}
if let (Some(liter), Some(col)) = (
left_any.downcast_ref::<phys_expr::Literal>(),
right_any.downcast_ref::<phys_expr::Column>(),
) {
return Some((col.clone(), liter.value().clone()));
}
None
// Bloom filters are probabilistic data structures that can return false
// positives (i.e. it might return true even if the value is not
// present) however, the bloom filter will return `false` if the value is
// definitely not present.

let known_not_present = values
.iter()
.map(|value| match value {
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
ScalarValue::Float64(Some(v)) => sbbf.check(v),
ScalarValue::Float32(Some(v)) => sbbf.check(v),
ScalarValue::Int64(Some(v)) => sbbf.check(v),
ScalarValue::Int32(Some(v)) => sbbf.check(v),
ScalarValue::Int16(Some(v)) => sbbf.check(v),
ScalarValue::Int8(Some(v)) => sbbf.check(v),
_ => true,
})
// The row group doesn't contain any of the values if
// all the checks are false
.all(|v| !v);

let contains = if known_not_present {
Some(false)
} else {
// Given the bloom filter is probabilistic, we can't be sure that
// the row group actually contains the values. Return `None` to
// indicate this uncertainty
None
};

Some(BooleanArray::from(vec![contains]))
}
}

Expand Down Expand Up @@ -1367,6 +1301,7 @@ mod tests {

let metadata = builder.metadata().clone();
let pruned_row_group = prune_row_groups_by_bloom_filters(
pruning_predicate.schema(),
&mut builder,
row_groups,
metadata.row_groups(),
Expand Down

0 comments on commit e10d3e2

Please sign in to comment.