Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinOptimization: Add build side pushdown to probe side #13054

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// when set to true, datafusion would try to push the build side statistic
/// to probe phase
pub dynamic_join_pushdown: bool, default = true
}
}

Expand Down
29 changes: 22 additions & 7 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@
//! Note: Most traits here need to be marked `Sync + Send` to be
//! compliant with the `SendableRecordBatchStream` trait.

use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector;
use crate::datasource::physical_plan::{FileMeta, FileScanConfig};
Expand All @@ -35,13 +29,19 @@ use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
use crate::physical_plan::RecordBatchStream;
use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::instant::Instant;
use datafusion_common::ScalarValue;

use datafusion_physical_plan::joins::DynamicFilterInfo;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -96,6 +96,8 @@ pub struct FileStream<F: FileOpener> {
baseline_metrics: BaselineMetrics,
/// Describes the behavior of the `FileStream` if file opening or scanning fails
on_error: OnError,
/// dynamic filters
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
}

/// Represents the state of the next `FileOpenFuture`. Since we need to poll
Expand Down Expand Up @@ -273,6 +275,7 @@ impl<F: FileOpener> FileStream<F> {
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error: OnError::Fail,
dynamic_filters: None,
})
}

Expand All @@ -284,6 +287,14 @@ impl<F: FileOpener> FileStream<F> {
self.on_error = on_error;
self
}
/// with dynamic filters
pub fn with_dynamic_filter(
mut self,
dynamic_filter: Option<Arc<DynamicFilterInfo>>,
) -> Self {
self.dynamic_filters = dynamic_filter;
self
}

/// Begin opening the next file in parallel while decoding the current file in FileStream.
///
Expand Down Expand Up @@ -391,7 +402,11 @@ impl<F: FileOpener> FileStream<F> {
}
}
match ready!(reader.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
Some(Ok(mut batch)) => {
// if there is a ready dynamic filter, we just use it to filter
if let Some(dynamic_filters) = &self.dynamic_filters {
batch = dynamic_filters.filter_batch(&batch)?
}
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let result = self
Expand Down
50 changes: 44 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use crate::{
use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};

use datafusion_physical_plan::joins::DynamicFilterInfo;
use datafusion_physical_plan::Metric;
use itertools::Itertools;
use log::debug;

Expand Down Expand Up @@ -282,6 +284,8 @@ pub struct ParquetExec {
table_parquet_options: TableParquetOptions,
/// Optional user defined schema adapter
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// dynamic filters (like join filters)
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
}

impl From<ParquetExec> for ParquetExecBuilder {
Expand All @@ -291,7 +295,6 @@ impl From<ParquetExec> for ParquetExecBuilder {
}

/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
///
/// See example on [`ParquetExec`].
pub struct ParquetExecBuilder {
file_scan_config: FileScanConfig,
Expand Down Expand Up @@ -463,6 +466,7 @@ impl ParquetExecBuilder {
cache,
table_parquet_options,
schema_adapter_factory,
dynamic_filters: None,
}
}
}
Expand Down Expand Up @@ -515,6 +519,7 @@ impl ParquetExec {
cache: _,
table_parquet_options,
schema_adapter_factory,
..
} = self;
ParquetExecBuilder {
file_scan_config: base_config,
Expand Down Expand Up @@ -711,10 +716,9 @@ impl DisplayAs for ParquetExec {
)
})
.unwrap_or_default();

write!(f, "ParquetExec: ")?;
self.base_config.fmt_as(t, f)?;
write!(f, "{}{}", predicate_string, pruning_predicate_string,)
write!(f, "{}{}", predicate_string, pruning_predicate_string)
}
}
}
Expand Down Expand Up @@ -798,7 +802,16 @@ impl ExecutionPlan for ParquetExec {
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));

if let Some(dynamic_filter) = &self.dynamic_filters {
let (final_expr, name) =
dynamic_filter.final_predicate(self.predicate.clone());
if final_expr.is_some() {
self.metrics.register(Arc::new(Metric::new(
datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name),
None,
)));
}
}
let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
Expand All @@ -817,9 +830,9 @@ impl ExecutionPlan for ParquetExec {
enable_bloom_filter: self.bloom_filter_on_read(),
schema_adapter_factory,
};

let stream =
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?;
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?
.with_dynamic_filter(self.dynamic_filters.clone());

Ok(Box::pin(stream))
}
Expand Down Expand Up @@ -862,8 +875,33 @@ impl ExecutionPlan for ParquetExec {
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
dynamic_filters: self.dynamic_filters.clone(),
}))
}

fn support_dynamic_filter(&self) -> bool {
true
}

fn with_dynamic_filter(
&self,
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(Some(Arc::new(ParquetExec {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
metadata_size_hint: self.metadata_size_hint,
parquet_file_reader_factory: self.parquet_file_reader_factory.clone(),
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
dynamic_filters,
})))
}
}

fn should_enable_page_index(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ fn adjust_input_keys_ordering(
projection.clone(),
PartitionMode::Partitioned,
*null_equals_null,
None,
)
.map(|e| Arc::new(e) as _)
};
Expand Down Expand Up @@ -632,6 +633,7 @@ pub(crate) fn reorder_join_keys_to_inputs(
projection.clone(),
PartitionMode::Partitioned,
*null_equals_null,
None,
)?));
}
}
Expand Down Expand Up @@ -1696,6 +1698,7 @@ pub(crate) mod tests {
None,
PartitionMode::Partitioned,
false,
None,
)
.unwrap(),
)
Expand Down
136 changes: 136 additions & 0 deletions datafusion/core/src/physical_optimizer/join_filter_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Pushdown the dynamic join filters down to scan execution if there is any

use std::collections::VecDeque;
use std::sync::Arc;

use crate::datasource::physical_plan::ParquetExec;
use crate::physical_plan::ExecutionPlan;
use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec};
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::joins::DynamicFilterInfo;

/// this rule used for pushing the build side statistic down to probe phase
#[derive(Default, Debug)]
pub struct JoinFilterPushdown {}

impl JoinFilterPushdown {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for JoinFilterPushdown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executed during plan time, won't it be reasonable to execute this during execution of HashJoin after build side is loaded?

fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if !config.optimizer.dynamic_join_pushdown {
return Ok(plan);
}

let mut filters_stack = VecDeque::new();
optimize_impl(plan, &mut filters_stack).data()
}

fn name(&self) -> &str {
"JoinFilterPushdown"
}

fn schema_check(&self) -> bool {
true
}
}

fn optimize_impl(
plan: Arc<dyn ExecutionPlan>,
join_filters_stack: &mut VecDeque<Arc<DynamicFilterInfo>>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
if let Some(hashjoin_exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
// Push current join's filters to the stack if they exist
if let Some(filters) = &hashjoin_exec.dynamic_filters_pushdown {
join_filters_stack.push_back(Arc::clone(filters));
}

// Optimize both sides
let new_right = optimize_impl(
Arc::<dyn ExecutionPlan>::clone(&hashjoin_exec.right),
join_filters_stack,
)?;

let new_left = optimize_impl(
Arc::<dyn ExecutionPlan>::clone(&hashjoin_exec.left),
join_filters_stack,
)?;

// Pop the filters after processing both sides
if hashjoin_exec.dynamic_filters_pushdown.is_some() {
join_filters_stack.pop_back();
}

if new_left.transformed || new_right.transformed {
let new_hash_join = Arc::new(HashJoinExec::try_new(
new_left.data,
new_right.data,
hashjoin_exec.on.clone(),
hashjoin_exec.filter().cloned(),
hashjoin_exec.join_type(),
hashjoin_exec.projection.clone(),
*hashjoin_exec.partition_mode(),
hashjoin_exec.null_equals_null(),
hashjoin_exec.dynamic_filters_pushdown.clone(),
)?);
return Ok(Transformed::yes(new_hash_join));
}
Ok(Transformed::no(plan))
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could allow this cool optimization on any ExecutionPlan by adding the "with_dynamic_filter" to the ExecutionPlan?
Maybe also a "supports_dynamic_filter" to know when to call "with_dynamic_filter".
So same principle as the existing static filter pushdown setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense

Copy link
Contributor Author

@Lordworms Lordworms Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll. refactor this, but currently support ParquetExec first? Since I think for other PhysicalScanExec, the way to add dynamic filter is to add a FilterExec above, but in parquet, we could utilize the predicate to add filters dynamically?

if let Some(filter) = join_filters_stack.pop_front() {
let final_exec = parquet_exec.clone().with_dynamic_filter(Some(filter))?;
if let Some(new_plan) = final_exec {
return Ok(Transformed::yes(new_plan));
}
}
Ok(Transformed::no(plan))
} else {
let children = plan.children();
let mut new_children = Vec::with_capacity(children.len());
let mut transformed = false;

for child in children {
let new_child = optimize_impl(
Arc::<dyn ExecutionPlan>::clone(child),
join_filters_stack,
)?;
if new_child.transformed {
transformed = true;
}
new_children.push(new_child.data);
}

if transformed {
let new_plan = plan.with_new_children(new_children)?;
Ok(Transformed::yes(new_plan))
} else {
Ok(Transformed::no(plan))
}
}
}
Loading