Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parallel collecting parquet files statistics #7573 #7595

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ config_namespace! {
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32
}
}

Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ use crate::physical_plan::{
Statistics,
};

/// The number of files to read in parallel when inferring schema
const SCHEMA_INFERENCE_CONCURRENCY: usize = 32;

/// The Apache Parquet `FileFormat` implementation
///
/// Note it is recommended these are instead configured on the [`ConfigOptions`]
Expand Down Expand Up @@ -177,7 +174,7 @@ impl FileFormat for ParquetFormat {
let schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint))
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
.buffered(SCHEMA_INFERENCE_CONCURRENCY)
.buffered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;

Expand Down
64 changes: 35 additions & 29 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,38 +934,44 @@ impl ListingTable {
let file_list = stream::iter(file_list).flatten();

// collect the statistics if required by the config
let files = file_list.then(|part_file| async {
let part_file = part_file?;
let mut statistics_result = Statistics::default();
if self.options.collect_stat {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache.get_with_extra(
&part_file.object_meta.location,
&part_file.object_meta,
) {
Some(statistics) => statistics_result = statistics.as_ref().clone(),
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let mut statistics_result = Statistics::default();
if self.options.collect_stat {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache.get_with_extra(
&part_file.object_meta.location,
&part_file.object_meta,
) {
Some(statistics) => {
statistics_result = statistics.as_ref().clone()
}
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
);
statistics_result = statistics;
);
statistics_result = statistics;
}
}
}
}
Ok((part_file, statistics_result)) as Result<(PartitionedFile, Statistics)>
});
Ok((part_file, statistics_result))
as Result<(PartitionedFile, Statistics)>
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);

let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit).await?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.meta_fetch_concurrency 32
datafusion.execution.parquet.allow_single_file_parallelism false
datafusion.execution.parquet.bloom_filter_enabled false
datafusion.execution.parquet.bloom_filter_fpp NULL
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system |
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
Expand Down