Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Oct 22, 2024
1 parent 34fbe8e commit e96004d
Show file tree
Hide file tree
Showing 29 changed files with 926 additions and 64 deletions.
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,10 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// when set to true, datafusion would try to push the build side statistic
/// to probe phase
pub dynamic_join_pushdown: bool, default = true
}
}

Expand Down
32 changes: 28 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};

use datafusion_physical_plan::joins::DynamicFilterInfo;
use itertools::Itertools;
use log::debug;

Expand Down Expand Up @@ -282,6 +283,8 @@ pub struct ParquetExec {
table_parquet_options: TableParquetOptions,
/// Optional user defined schema adapter
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// dynamic filters (like join filters)
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
}

impl From<ParquetExec> for ParquetExecBuilder {
Expand All @@ -291,7 +294,6 @@ impl From<ParquetExec> for ParquetExecBuilder {
}

/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
///
/// See example on [`ParquetExec`].
pub struct ParquetExecBuilder {
file_scan_config: FileScanConfig,
Expand Down Expand Up @@ -463,6 +465,7 @@ impl ParquetExecBuilder {
cache,
table_parquet_options,
schema_adapter_factory,
dynamic_filters: None,
}
}
}
Expand Down Expand Up @@ -515,6 +518,7 @@ impl ParquetExec {
cache: _,
table_parquet_options,
schema_adapter_factory,
..
} = self;
ParquetExecBuilder {
file_scan_config: base_config,
Expand Down Expand Up @@ -579,6 +583,15 @@ impl ParquetExec {
self
}

/// with the dynamic filter
pub fn with_dynamic_filter(
mut self,
dynamic_filter: Option<Arc<DynamicFilterInfo>>,
) -> Self {
self.dynamic_filters = dynamic_filter;
self
}

/// If true, the predicate will be used during the parquet scan.
/// Defaults to false
///
Expand Down Expand Up @@ -711,10 +724,15 @@ impl DisplayAs for ParquetExec {
)
})
.unwrap_or_default();

let dynamic_filter =
format!("dynamic_filter: {:?}", self.dynamic_filters);
write!(f, "ParquetExec: ")?;
self.base_config.fmt_as(t, f)?;
write!(f, "{}{}", predicate_string, pruning_predicate_string,)
write!(
f,
"{}{}{}",
predicate_string, pruning_predicate_string, dynamic_filter
)
}
}
}
Expand Down Expand Up @@ -798,13 +816,18 @@ impl ExecutionPlan for ParquetExec {
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
let final_predicate = if let Some(dynamic_filter) = &self.dynamic_filters {
dynamic_filter.final_predicate(self.predicate.clone())
} else {
self.predicate.clone()
};

let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
batch_size: ctx.session_config().batch_size(),
limit: self.base_config.limit,
predicate: self.predicate.clone(),
predicate: final_predicate,
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
table_schema: self.base_config.file_schema.clone(),
Expand Down Expand Up @@ -862,6 +885,7 @@ impl ExecutionPlan for ParquetExec {
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
dynamic_filters: self.dynamic_filters.clone(),
}))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ impl FileOpener for ParquetOpener {
&file_metrics,
Arc::clone(&schema_mapping),
);

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
};
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;
use itertools::izip;

/// The `EnforceDistribution` rule ensures that distribution requirements are
Expand Down
112 changes: 112 additions & 0 deletions datafusion/core/src/physical_optimizer/join_filter_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Pushdown the dynamic join filters down to scan execution if there is any
use std::sync::Arc;

use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec};

use crate::datasource::physical_plan::ParquetExec;
use crate::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::joins::DynamicFilterInfo;

/// this rule used for pushing the build side statistic down to probe phase
#[derive(Default, Debug)]
pub struct JoinFilterPushdown {}

impl JoinFilterPushdown {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for JoinFilterPushdown {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if !config.optimizer.dynamic_join_pushdown {
return Ok(plan);
}
optimize_impl(plan, &mut None).data()
}

fn name(&self) -> &str {
"JoinFilterPushdown"
}

fn schema_check(&self) -> bool {
true
}
}

fn optimize_impl(
plan: Arc<dyn ExecutionPlan>,
join_filters: &mut Option<Arc<DynamicFilterInfo>>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
if let Some(hashjoin_exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
join_filters.clone_from(&hashjoin_exec.dynamic_filters_pushdown);
let new_right = optimize_impl(hashjoin_exec.right.clone(), join_filters)?;
if new_right.transformed {
let new_hash_join = HashJoinExec::try_new(
hashjoin_exec.left().clone(),
new_right.data,
hashjoin_exec.on.clone(),
hashjoin_exec.filter().cloned(),
hashjoin_exec.join_type(),
hashjoin_exec.projection.clone(),
*hashjoin_exec.partition_mode(),
hashjoin_exec.null_equals_null(),
)?
.with_dynamic_filter_info(hashjoin_exec.dynamic_filters_pushdown.clone());
return Ok(Transformed::yes(Arc::new(new_hash_join)));
}
Ok(Transformed::no(plan))
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
if let Some(dynamic_filters) = join_filters {
let final_exec = parquet_exec
.clone()
.with_dynamic_filter(Some(dynamic_filters.clone()));
return Ok(Transformed::yes(Arc::new(final_exec)));
}
Ok(Transformed::no(plan))
} else {
let children = plan.children();
let mut new_children = Vec::with_capacity(children.len());
let mut transformed = false;

for child in children {
let new_child = optimize_impl(child.clone(), join_filters)?;
if new_child.transformed {
transformed = true;
}
new_children.push(new_child.data);
}

if transformed {
let new_plan = plan.with_new_children(new_children)?;
Ok(Transformed::yes(new_plan))
} else {
Ok(Transformed::no(plan))
}
}
}
23 changes: 13 additions & 10 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,19 @@ fn try_collect_left(
{
Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
} else {
Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.projection.clone(),
PartitionMode::CollectLeft,
hash_join.null_equals_null(),
)?)))
Ok(Some(Arc::new(
HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.projection.clone(),
PartitionMode::CollectLeft,
hash_join.null_equals_null(),
)?
.with_dynamic_filter_info(hash_join.dynamic_filters_pushdown.clone()),
)))
}
}
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
pub mod coalesce_batches;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_filter_pushdown;
pub mod join_selection;
pub mod optimizer;
pub mod projection_pushdown;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use std::sync::Arc;

use super::join_filter_pushdown::JoinFilterPushdown;
use super::projection_pushdown::ProjectionPushdown;
use super::update_aggr_exprs::OptimizeAggregateOrder;
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
Expand Down Expand Up @@ -112,6 +113,7 @@ impl PhysicalOptimizer {
// given query plan; i.e. it only acts as a final
// gatekeeping rule.
Arc::new(SanityCheckPlan::new()),
Arc::new(JoinFilterPushdown::new()),
];

Self::with_rules(rules)
Expand Down
Loading

0 comments on commit e96004d

Please sign in to comment.