-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/single source exec #56
Conversation
fix csv_json example
# Conflicts: # datafusion/physical-plan/src/aggregates/mod.rs
# Conflicts: # datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs # datafusion/physical-plan/src/memory.rs
# Conflicts: # datafusion/core/src/datasource/file_format/arrow.rs # datafusion/core/src/datasource/file_format/csv.rs # datafusion/core/src/datasource/file_format/json.rs # datafusion/core/src/datasource/file_format/parquet.rs # datafusion/core/src/datasource/memory.rs # datafusion/core/src/test/mod.rs # datafusion/physical-plan/src/memory.rs
# Conflicts: # datafusion/core/src/datasource/memory.rs # datafusion/core/src/datasource/physical_plan/arrow_file.rs # datafusion/core/src/datasource/physical_plan/avro.rs # datafusion/core/src/datasource/physical_plan/csv.rs # datafusion/core/src/datasource/physical_plan/file_scan_config.rs # datafusion/core/src/datasource/physical_plan/json.rs # datafusion/core/src/datasource/physical_plan/parquet/mod.rs # datafusion/core/src/physical_optimizer/enforce_distribution.rs # datafusion/core/src/physical_optimizer/enforce_sorting.rs # datafusion/core/src/physical_optimizer/sanity_checker.rs # datafusion/core/src/physical_optimizer/test_utils.rs # datafusion/physical-plan/src/memory.rs # datafusion/sqllogictest/test_files/group_by.slt
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> { | ||
let mut source = Arc::clone(&self.source); | ||
source = source.with_fetch(limit)?; | ||
let cache = source.properties().clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to recompute some properties given the limit is given?
base_config: FileScanConfig, | ||
metrics: ExecutionPlanMetricsSet, | ||
projected_statistics: Statistics, | ||
cache: PlanProperties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metrics: ExecutionPlanMetricsSet,
projected_statistics: Statistics,
cache: PlanProperties,
Why do we have plan related information in config?
I think they are not belong to config.
Instead of new_exec
, I think we should add
DataSourceExec::new(config: FileSourceConfig, source: Arc<dyn FileSource>,)
If we change any property in config like file group, we only change config itself, DataSourceExec
that relies on the config should take care of updated one by itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan-synnada It was because of some shared logic like repartitions etc. But I understand your point, I've moved the statistics & metrics into lower level configurations and moved cache to the upper level DataSourceExec.
I've created the new_exec
as a syntactic sugar to not duplicate the Arc::new(DataSourceExec::new(FileSourceConfig::new(base_config,file_source)))
Since config can be FileSourceConfig
or MemorySourceConfig
.
Can you please review again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current dependency structure places the data source/file layer above the physical plan layer. However, the function FileSourceConfig::new() -> DataSourceExec
violates this dependency hierarchy, potentially making the module harder to decouple. In apache#10782, we might want to have separated module for catalog / file & data source / table and so on. The dependency is like
Catalog -> Schema -> Table -> FileFormat -> QueryPlanner.
If we move FileSourceConfig
into FileFormat
crate, we don't expect to import Planner
struct here but FileSourceConfig::new_exec
has to. In another direction, it is fine to import FileSourceConfig
for DataSourceExec::new()
Also, I still think plan properties should be computed when we called DataSourceExec::new()
, but we compute it when config created. I think the File/DataSource and PhysicalPlan abstraction is not clear enough to me 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to unify memory and file into single concept
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to unify memory and file into single concept
I believe the DataSource trait is best we can get since their open
implementations are totally different and there are some file-based shared logic in FileSourceConfig
@@ -172,8 +173,7 @@ impl FileFormat for ArrowFormat { | |||
conf: FileScanConfig, | |||
_filters: Option<&Arc<dyn PhysicalExpr>>, | |||
) -> Result<Arc<dyn ExecutionPlan>> { | |||
let exec = ArrowExec::new(conf); | |||
Ok(Arc::new(exec)) | |||
Ok(FileSourceConfig::new_exec(conf, Arc::new(ArrowConfig {}))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore, I expect we have
DataSourceExec::new(config: FileSourceConfig, source: Arc<dyn FileSource>,)
or maybe we don't need FileSourceConfig
at all, just take Arc<dyn FileSource>
and FileScanConfig
around
PlanProperties should be computed inside DataSourceExec::new
. Any change of source config (i.e. with fetch or file groups) trigger the re-computation of properties if required.
ComputeProperties is common behaviour related to Plan should moved to DataSourceExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved compute_properties into DataSourceExec 👍
As we talked, to move the new_exec to DataSourceExec we need to be able to import FileSourceConfig, and FileScanConfig in datafusion/physical-plan
but they're in datafusion/core
at the moment. Since as you mentioned this is nice to have and will require much more code movement, I'm passing this one with just a recording. I'll also mention this when we open the PR to the upstream, so maybe the community can come up with a better idea.
Here's what we want to achieve
impl DataSourceExec {
pub fn from_file_config(base_config: FileScanConfig, file_source: Arc<dyn FileSource>) -> Arc<DataSourceExec> {
let source = Arc::new(FileSourceConfig::new(base_config, file_source));
Arc::new(Self::new(source))
}
}
make cache a part of DataSourceExec
# Conflicts: # datafusion/core/src/dataframe/mod.rs
@@ -261,64 +245,43 @@ impl DataSource for FileSourceConfig { | |||
fn repartitioned( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be a method in DataSourceExec, it is actually transforming one DataSourceExec
to another one with some configuration information like partition, file group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a bold assumption, maybe .trait DataSource
is not required, such methods should belong to DataSourceExec instead
In AggregateExec, we have StreamType
to differentiate the exec stream we need to run.
For DataSource there is only MemoryStream and FileStream I guess, so DataSource
is probably not required at all
enum StreamType {
AggregateStream(AggregateStream),
GroupedHash(GroupedHashAggregateStream),
GroupedPriorityQueue(GroupedTopKAggregateStream),
}
We can chose the stream type based on some configuration or properties
enum DataSourceExecType {
File,
Memory,
}
fn execute_typed(&self, partition: usize, context: Arc<TaskContext>) -> SendableRecordBatchStream {
if self.source.source_type() == File {
FileStream
} else {
MemoryStream
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think repartitioned
can be restructure like this
impl DataSource {
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize
) -> Result<Option<Self>> {}
fn output_partition(&self) -> Partitioning;
}
impl ExecutionPlan for DataSourceExec {
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
let source = self.source.repartitioned();
let output_partitioning = source.output_partitioning();
// Other metadata from DataSource
self.clone().with_source().with_partitioning()
}
}
I think the dependency like this makes more sense.
trait DataSource deal with the data source related definition that differs for each source type like partitioning
trait DataSourceExec compute plan based on the DataSource's properties as function parameters
file_scan_config: &FileScanConfig, | ||
) -> PlanProperties { | ||
// Equivalence Properties | ||
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); | ||
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To compute properties what we need are Partitioning
and EquivalenceProperties
.
We can make DataSource trait method to get them for DataSourceExec.
@@ -89,7 +89,7 @@ This: | |||
2. Constructs the individual output arrays (columns) | |||
3. Returns a `MemoryStream` of a single `RecordBatch` with the arrays | |||
|
|||
I.e. returns the "physical" data. For other examples, refer to the [`CsvExec`][csv] and [`ParquetExec`][parquet] for more complex implementations. | |||
I.e. returns the "physical" data. For other examples, refer to the [`CsvConfig`][csv] and [`ParquetConfig`][parquet] for more complex implementations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These link to the old implementation and might be confusing. There might be other places in the docs or comments where a blanket find-and-replace might have broken logical consistency because ParquetExec
and ParquetConfig
aren't exactly the same thing.
datafusion-upstream/docs/source/library-user-guide/custom-table-providers.md
Lines 179 to 181 in cb8c2ae
[ex]: https://github.com/apache/datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion-examples/examples/custom_datasource.rs#L214C1-L276 | |
[csv]: https://github.com/apache/datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion/core/src/datasource/physical_plan/csv.rs#L57-L70 | |
[parquet]: https://github.com/apache/datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion/core/src/datasource/physical_plan/parquet.rs#L77-L104 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will note this in the upstream PR as well. Once merged I think we can update these with a follow-up PR.
let data_type = [ | ||
("avro", file_source.downcast_ref::<AvroConfig>().is_some()), | ||
("arrow", file_source.downcast_ref::<ArrowConfig>().is_some()), | ||
("csv", file_source.downcast_ref::<CsvConfig>().is_some()), | ||
("json", file_source.downcast_ref::<JsonConfig>().is_some()), | ||
#[cfg(feature = "parquet")] | ||
( | ||
"parquet", | ||
file_source.downcast_ref::<ParquetConfig>().is_some(), | ||
), | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make more sense to use something like self.source.data_type()
because ideally the implementation here should be unaware of the underlying FileSource
implementation. It would make adding new file formats easier.
if let Some(csv_conf) = self.source.as_any().downcast_ref::<CsvConfig>() { | ||
return write!(f, ", has_header={}", csv_conf.has_header); | ||
} | ||
|
||
#[cfg(feature = "parquet")] | ||
if let Some(parquet_conf) = self.source.as_any().downcast_ref::<ParquetConfig>() { | ||
return match t { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Maybe something like self.source.fmt_extra(f)
. They would be better contained in their own implementations.
…t_partitioning in DataSource trait
create fmt_extra method
# Conflicts: # datafusion/core/src/physical_planner.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Opened to upstream closing this one: apache#14224 |
Which issue does this PR close?
Closes apache#13838.
Rationale for this change
This PR merges all Data sources into one Execution Plan, named DataSourceExec and a single trait named DataSource which is inspired by DataSink.
This version is not intended to be merged in Upstream since it removes all the ParquetExec, CsvExec, etc., and changes all the tests, but I'm sharing this as is so that we can see all possible changes. But our main intention is to re-adding old execution plans as deprecated ones and implement this part by part to keep API stability.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?