Skip to content

Commit

Permalink
make with_dynamic_filters part of execution_plan trait
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Oct 22, 2024
1 parent 2f8122b commit 7922441
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
33 changes: 24 additions & 9 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,15 +583,6 @@ impl ParquetExec {
self
}

/// with the dynamic filter
pub fn with_dynamic_filter(
mut self,
dynamic_filter: Option<Arc<DynamicFilterInfo>>,
) -> Self {
self.dynamic_filters = dynamic_filter;
self
}

/// If true, the predicate will be used during the parquet scan.
/// Defaults to false
///
Expand Down Expand Up @@ -888,6 +879,30 @@ impl ExecutionPlan for ParquetExec {
dynamic_filters: self.dynamic_filters.clone(),
}))
}

fn support_dynamic_filter(&self) -> bool {
true
}

fn with_dynamic_filter(
&self,
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(ParquetExec {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
metadata_size_hint: self.metadata_size_hint.clone(),
parquet_file_reader_factory: self.parquet_file_reader_factory.clone(),
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
dynamic_filters: dynamic_filters,
}))
}
}

fn should_enable_page_index(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ fn optimize_impl(
let final_exec = parquet_exec
.clone()
.with_dynamic_filter(Some(dynamic_filters.clone()));
return Ok(Transformed::yes(Arc::new(final_exec)));
if let Some(plan) = final_exec {
return Ok(Transformed::yes(plan));
} else {
return Ok(Transformed::no(plan));
}
}
Ok(Transformed::no(plan))
} else {
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use crate::joins::DynamicFilterInfo;
pub use crate::metrics::Metric;
use crate::metrics::MetricsSet;
pub use crate::ordering::InputOrderMode;
Expand Down Expand Up @@ -421,6 +422,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Unknown
}

/// whether it is a physical scan or not
fn support_dynamic_filter(&self) -> bool {
false
}

/// return a new execution plan with dynamic filter
fn with_dynamic_filter(
&self,
_dynamic_filters: Option<Arc<DynamicFilterInfo>>,
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
}

/// Extension trait provides an easy API to fetch various properties of
Expand Down

0 comments on commit 7922441

Please sign in to comment.