Skip to content

Commit

Permalink
use create_cache_convention
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Feb 26, 2024
1 parent 93f5282 commit ace9815
Show file tree
Hide file tree
Showing 46 changed files with 787 additions and 735 deletions.
18 changes: 9 additions & 9 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use tokio::time::timeout;
Expand Down Expand Up @@ -199,22 +200,21 @@ impl CustomExec {
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
let cache = PlanPropertiesCache::new_default(projected_schema.clone());
let cache = Self::create_cache(projected_schema.clone());
Self {
db,
projected_schema,
cache,
}
.with_cache()
}

fn with_cache(mut self) -> Self {
self.cache = self
.cache
.with_partitioning(Partitioning::UnknownPartitioning(1))
.with_exec_mode(ExecutionMode::Bounded);

self
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}

Expand Down
34 changes: 19 additions & 15 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ impl ArrowExec {
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = PlanPropertiesCache::new_default(projected_schema.clone());
let cache = Self::create_cache(
projected_schema.clone(),
&projected_output_ordering,
&base_config,
);
Self {
base_config,
projected_schema,
Expand All @@ -70,36 +74,36 @@ impl ArrowExec {
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
.with_cache()
}
/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

fn output_partitioning_helper(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

fn with_cache(mut self) -> Self {
fn create_cache(
schema: SchemaRef,
projected_output_ordering: &[LexOrdering],
file_scan_config: &FileScanConfig,
) -> PlanPropertiesCache {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);
let eq_properties =
EquivalenceProperties::new_with_orderings(schema, projected_output_ordering);

self.cache = PlanPropertiesCache::new(
PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = self.output_partitioning_helper();
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
Expand Down
25 changes: 14 additions & 11 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ impl AvroExec {
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = PlanPropertiesCache::new_default(projected_schema.clone());
let cache = Self::create_cache(
projected_schema.clone(),
&projected_output_ordering,
&base_config,
);
Self {
base_config,
projected_schema,
Expand All @@ -59,27 +63,26 @@ impl AvroExec {
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
.with_cache()
}
/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

fn with_cache(mut self) -> Self {
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
file_scan_config: &FileScanConfig,
) -> PlanPropertiesCache {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);
let n_partitions = self.base_config.file_groups.len();
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
let n_partitions = file_scan_config.file_groups.len();

self.cache = PlanPropertiesCache::new(
PlanPropertiesCache::new(
eq_properties,
Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
)
}
}

Expand Down
35 changes: 18 additions & 17 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use tokio::task::JoinSet;
pub struct CsvExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_output_ordering: Vec<LexOrdering>,
has_header: bool,
delimiter: u8,
quote: u8,
Expand All @@ -77,11 +76,14 @@ impl CsvExec {
) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = PlanPropertiesCache::new_default(projected_schema);
let cache = Self::create_cache(
projected_schema,
&projected_output_ordering,
&base_config,
);
Self {
base_config,
projected_statistics,
projected_output_ordering,
has_header,
delimiter,
quote,
Expand All @@ -90,7 +92,6 @@ impl CsvExec {
file_compression_type,
cache,
}
.with_cache()
}

/// Ref to the base configs
Expand All @@ -116,29 +117,29 @@ impl CsvExec {
self.escape
}

fn output_partitioning_helper(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

fn with_cache(mut self) -> Self {
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
file_scan_config: &FileScanConfig,
) -> PlanPropertiesCache {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);

self.cache = PlanPropertiesCache::new(
PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = self.output_partitioning_helper();
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
Expand Down
35 changes: 18 additions & 17 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use tokio::task::JoinSet;
pub struct NdJsonExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
file_compression_type: FileCompressionType,
Expand All @@ -67,46 +66,48 @@ impl NdJsonExec {
) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = PlanPropertiesCache::new_default(projected_schema);
let cache = Self::create_cache(
projected_schema,
&projected_output_ordering,
&base_config,
);
Self {
base_config,
projected_statistics,
projected_output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
cache,
}
.with_cache()
}

/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

fn output_partitioning_helper(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

fn with_cache(mut self) -> Self {
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
file_scan_config: &FileScanConfig,
) -> PlanPropertiesCache {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);

self.cache = PlanPropertiesCache::new(
PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = self.output_partitioning_helper();
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
Expand Down
35 changes: 18 additions & 17 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ pub struct ParquetExec {
/// Base configuration for this scan
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_output_ordering: Vec<LexOrdering>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
Expand Down Expand Up @@ -149,15 +148,18 @@ impl ParquetExec {

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = PlanPropertiesCache::new_default(projected_schema);
let cache = Self::create_cache(
projected_schema,
&projected_output_ordering,
&base_config,
);
Self {
pushdown_filters: None,
reorder_filters: None,
enable_page_index: None,
enable_bloom_filter: None,
base_config,
projected_statistics,
projected_output_ordering,
metrics,
predicate,
pruning_predicate,
Expand All @@ -166,7 +168,6 @@ impl ParquetExec {
parquet_file_reader_factory: None,
cache,
}
.with_cache()
}

/// Ref to the base configs
Expand Down Expand Up @@ -261,29 +262,29 @@ impl ParquetExec {
.unwrap_or(config_options.execution.parquet.bloom_filter_enabled)
}

fn output_partitioning_helper(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_config.file_groups.len())
}

fn with_cache(mut self) -> Self {
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
file_config: &FileScanConfig,
) -> PlanPropertiesCache {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
);
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);

self.cache = PlanPropertiesCache::new(
PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
Self::output_partitioning_helper(file_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
)
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = self.output_partitioning_helper();
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
Expand Down
Loading

0 comments on commit ace9815

Please sign in to comment.