Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/single source exec #56

Closed
wants to merge 47 commits into from

Conversation

mertak-synnada
Copy link

Which issue does this PR close?

Closes apache#13838.

Rationale for this change

This PR merges all Data sources into one Execution Plan, named DataSourceExec and a single trait named DataSource which is inspired by DataSink.

This version is not intended to be merged in Upstream since it removes all the ParquetExec, CsvExec, etc., and changes all the tests, but I'm sharing this as is so that we can see all possible changes. But our main intention is to re-adding old execution plans as deprecated ones and implement this part by part to keep API stability.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@mertak-synnada mertak-synnada changed the title Chore/single source exec Feature/single source exec Jan 7, 2025
@mertak-synnada mertak-synnada marked this pull request as draft January 7, 2025 14:27
fix csv_json example
# Conflicts:
#	datafusion/physical-plan/src/aggregates/mod.rs
# Conflicts:
#	datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
#	datafusion/physical-plan/src/memory.rs
# Conflicts:
#	datafusion/core/src/datasource/file_format/arrow.rs
#	datafusion/core/src/datasource/file_format/csv.rs
#	datafusion/core/src/datasource/file_format/json.rs
#	datafusion/core/src/datasource/file_format/parquet.rs
#	datafusion/core/src/datasource/memory.rs
#	datafusion/core/src/test/mod.rs
#	datafusion/physical-plan/src/memory.rs
# Conflicts:
#	datafusion/core/src/datasource/memory.rs
#	datafusion/core/src/datasource/physical_plan/arrow_file.rs
#	datafusion/core/src/datasource/physical_plan/avro.rs
#	datafusion/core/src/datasource/physical_plan/csv.rs
#	datafusion/core/src/datasource/physical_plan/file_scan_config.rs
#	datafusion/core/src/datasource/physical_plan/json.rs
#	datafusion/core/src/datasource/physical_plan/parquet/mod.rs
#	datafusion/core/src/physical_optimizer/enforce_distribution.rs
#	datafusion/core/src/physical_optimizer/enforce_sorting.rs
#	datafusion/core/src/physical_optimizer/sanity_checker.rs
#	datafusion/core/src/physical_optimizer/test_utils.rs
#	datafusion/physical-plan/src/memory.rs
#	datafusion/sqllogictest/test_files/group_by.slt
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let mut source = Arc::clone(&self.source);
source = source.with_fetch(limit)?;
let cache = source.properties().clone();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to recompute some properties given the limit is given?

base_config: FileScanConfig,
metrics: ExecutionPlanMetricsSet,
projected_statistics: Statistics,
cache: PlanProperties,
Copy link

@jayzhan-synnada jayzhan-synnada Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics: ExecutionPlanMetricsSet,
projected_statistics: Statistics,
cache: PlanProperties,

Why do we have plan related information in config?

I think they are not belong to config.

Instead of new_exec, I think we should add

DataSourceExec::new(config: FileSourceConfig, source: Arc<dyn FileSource>,)

If we change any property in config like file group, we only change config itself, DataSourceExec that relies on the config should take care of updated one by itself.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayzhan-synnada It was because of some shared logic like repartitions etc. But I understand your point, I've moved the statistics & metrics into lower level configurations and moved cache to the upper level DataSourceExec.

I've created the new_exec as a syntactic sugar to not duplicate the Arc::new(DataSourceExec::new(FileSourceConfig::new(base_config,file_source))) Since config can be FileSourceConfig or MemorySourceConfig.

Can you please review again?

Copy link

@jayzhan-synnada jayzhan-synnada Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current dependency structure places the data source/file layer above the physical plan layer. However, the function FileSourceConfig::new() -> DataSourceExec violates this dependency hierarchy, potentially making the module harder to decouple. In apache#10782, we might want to have separated module for catalog / file & data source / table and so on. The dependency is like

Catalog -> Schema -> Table -> FileFormat -> QueryPlanner.

If we move FileSourceConfig into FileFormat crate, we don't expect to import Planner struct here but FileSourceConfig::new_exec has to. In another direction, it is fine to import FileSourceConfig for DataSourceExec::new()

Also, I still think plan properties should be computed when we called DataSourceExec::new(), but we compute it when config created. I think the File/DataSource and PhysicalPlan abstraction is not clear enough to me 🤔

Copy link

@jayzhan-synnada jayzhan-synnada Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to unify memory and file into single concept

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to unify memory and file into single concept

I believe the DataSource trait is best we can get since their open implementations are totally different and there are some file-based shared logic in FileSourceConfig

@@ -172,8 +173,7 @@ impl FileFormat for ArrowFormat {
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = ArrowExec::new(conf);
Ok(Arc::new(exec))
Ok(FileSourceConfig::new_exec(conf, Arc::new(ArrowConfig {})))
Copy link

@jayzhan-synnada jayzhan-synnada Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, I expect we have

DataSourceExec::new(config: FileSourceConfig, source: Arc<dyn FileSource>,)

or maybe we don't need FileSourceConfig at all, just take Arc<dyn FileSource> and FileScanConfig around

PlanProperties should be computed inside DataSourceExec::new. Any change of source config (i.e. with fetch or file groups) trigger the re-computation of properties if required.
ComputeProperties is common behaviour related to Plan should moved to DataSourceExec

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved compute_properties into DataSourceExec 👍

As we talked, to move the new_exec to DataSourceExec we need to be able to import FileSourceConfig, and FileScanConfig in datafusion/physical-plan but they're in datafusion/core at the moment. Since as you mentioned this is nice to have and will require much more code movement, I'm passing this one with just a recording. I'll also mention this when we open the PR to the upstream, so maybe the community can come up with a better idea.

Here's what we want to achieve

impl DataSourceExec {
    pub fn from_file_config(base_config: FileScanConfig, file_source: Arc<dyn FileSource>) -> Arc<DataSourceExec> {
        let source = Arc::new(FileSourceConfig::new(base_config, file_source));
        Arc::new(Self::new(source))
    }
}

@@ -261,64 +245,43 @@ impl DataSource for FileSourceConfig {
fn repartitioned(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a method in DataSourceExec, it is actually transforming one DataSourceExec to another one with some configuration information like partition, file group

Copy link

@jayzhan-synnada jayzhan-synnada Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a bold assumption, maybe trait DataSource is not required, such methods should belong to DataSourceExec instead.

In AggregateExec, we have StreamType to differentiate the exec stream we need to run.

For DataSource there is only MemoryStream and FileStream I guess, so DataSource is probably not required at all

enum StreamType {
    AggregateStream(AggregateStream),
    GroupedHash(GroupedHashAggregateStream),
    GroupedPriorityQueue(GroupedTopKAggregateStream),
}

We can chose the stream type based on some configuration or properties

    enum DataSourceExecType {
        File,
        Memory,
    }

    fn execute_typed(&self, partition: usize, context: Arc<TaskContext>) -> SendableRecordBatchStream {
        if self.source.source_type() == File {
            FileStream
        } else {
            MemoryStream
        }
    }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think repartitioned can be restructure like this

impl DataSource {
   fn repartitioned(
        &self,
        target_partitions: usize,
        repartition_file_min_size: usize
   ) -> Result<Option<Self>> {}

   fn output_partition(&self) -> Partitioning;
}

impl ExecutionPlan for DataSourceExec {
   fn repartitioned(
        &self,
        target_partitions: usize,
        config: &ConfigOptions,
    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
       let source = self.source.repartitioned();
       let output_partitioning = source.output_partitioning();
       // Other metadata from DataSource
        self.clone().with_source().with_partitioning()
    }
}

I think the dependency like this makes more sense.

trait DataSource deal with the data source related definition that differs for each source type like partitioning
trait DataSourceExec compute plan based on the DataSource's properties as function parameters

file_scan_config: &FileScanConfig,
) -> PlanProperties {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To compute properties what we need are Partitioning and EquivalenceProperties.

We can make DataSource trait method to get them for DataSourceExec.

@@ -89,7 +89,7 @@ This:
2. Constructs the individual output arrays (columns)
3. Returns a `MemoryStream` of a single `RecordBatch` with the arrays

I.e. returns the "physical" data. For other examples, refer to the [`CsvExec`][csv] and [`ParquetExec`][parquet] for more complex implementations.
I.e. returns the "physical" data. For other examples, refer to the [`CsvConfig`][csv] and [`ParquetConfig`][parquet] for more complex implementations.
Copy link

@alihan-synnada alihan-synnada Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These link to the old implementation and might be confusing. There might be other places in the docs or comments where a blanket find-and-replace might have broken logical consistency because ParquetExec and ParquetConfig aren't exactly the same thing.

[ex]: https://github.com/apache/datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion-examples/examples/custom_datasource.rs#L214C1-L276
[csv]: https://github.com/apache/datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion/core/src/datasource/physical_plan/csv.rs#L57-L70
[parquet]: https://github.com/apache/datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion/core/src/datasource/physical_plan/parquet.rs#L77-L104

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will note this in the upstream PR as well. Once merged I think we can update these with a follow-up PR.

Comment on lines 109 to 119
let data_type = [
("avro", file_source.downcast_ref::<AvroConfig>().is_some()),
("arrow", file_source.downcast_ref::<ArrowConfig>().is_some()),
("csv", file_source.downcast_ref::<CsvConfig>().is_some()),
("json", file_source.downcast_ref::<JsonConfig>().is_some()),
#[cfg(feature = "parquet")]
(
"parquet",
file_source.downcast_ref::<ParquetConfig>().is_some(),
),
]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make more sense to use something like self.source.data_type() because ideally the implementation here should be unaware of the underlying FileSource implementation. It would make adding new file formats easier.

Comment on lines 197 to 203
if let Some(csv_conf) = self.source.as_any().downcast_ref::<CsvConfig>() {
return write!(f, ", has_header={}", csv_conf.has_header);
}

#[cfg(feature = "parquet")]
if let Some(parquet_conf) = self.source.as_any().downcast_ref::<ParquetConfig>() {
return match t {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Maybe something like self.source.fmt_extra(f). They would be better contained in their own implementations.

Copy link

@jayzhan-synnada jayzhan-synnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@mertak-synnada
Copy link
Author

Opened to upstream closing this one: apache#14224

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants