Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parallel collecting parquet files statistics #7573 #7595

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ config_namespace! {
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// Number of files to read in parallel when inferring schema and statistics
///
/// Defaults to the number of CPU cores on the system
pub meta_fetch_concurrency: usize, default = num_cpus::get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be weird to use the number of cpu's as value here, as to benefit from it could be higher than the number of cpu's.
What about using the fixed value 32 here by default used previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dandandan I agree set it higher than cpu::num, actually I use cpu::num() * 4 in my project. but for fixed 32, the user may have 4 CPU cores or 64 cores.

But this is a default value, users can override it. I think we can use 32.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the final plan for this PR? Will we set the default to 32 ? Or shall we leave it at the number of cores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will change it to 32. this should be better. let me do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Done.

}
}

Expand Down Expand Up @@ -690,6 +695,7 @@ impl ConfigOptions {
// Normalize for display
s.execution.target_partitions = 0;
s.execution.planning_concurrency = 0;
s.execution.meta_fetch_concurrency = 0;

let mut docs = "| key | default | description |\n".to_string();
docs += "|-----|---------|-------------|\n";
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ use crate::physical_plan::{
Statistics,
};

/// The number of files to read in parallel when inferring schema
const SCHEMA_INFERENCE_CONCURRENCY: usize = 32;

/// The Apache Parquet `FileFormat` implementation
///
/// Note it is recommended these are instead configured on the [`ConfigOptions`]
Expand Down Expand Up @@ -177,7 +174,7 @@ impl FileFormat for ParquetFormat {
let schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint))
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
.buffered(SCHEMA_INFERENCE_CONCURRENCY)
.buffered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;

Expand Down
64 changes: 35 additions & 29 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,38 +934,44 @@ impl ListingTable {
let file_list = stream::iter(file_list).flatten();

// collect the statistics if required by the config
let files = file_list.then(|part_file| async {
let part_file = part_file?;
let mut statistics_result = Statistics::default();
if self.options.collect_stat {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache.get_with_extra(
&part_file.object_meta.location,
&part_file.object_meta,
) {
Some(statistics) => statistics_result = statistics.as_ref().clone(),
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let mut statistics_result = Statistics::default();
if self.options.collect_stat {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache.get_with_extra(
&part_file.object_meta.location,
&part_file.object_meta,
) {
Some(statistics) => {
statistics_result = statistics.as_ref().clone()
}
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
);
statistics_result = statistics;
);
statistics_result = statistics;
}
}
}
}
Ok((part_file, statistics_result)) as Result<(PartitionedFile, Statistics)>
});
Ok((part_file, statistics_result))
as Result<(PartitionedFile, Statistics)>
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);

let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit).await?;
Expand Down
7 changes: 7 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ SET datafusion.execution.target_partitions=7
statement ok
SET datafusion.execution.planning_concurrency=13

# meta_fetch_concurrency defaults to num_cores, so set
# to a known value that is unlikely to be
# the real number of cores on a system
statement ok
SET datafusion.execution.meta_fetch_concurrency=13

# pin the version string for test
statement ok
SET datafusion.execution.parquet.created_by=datafusion
Expand All @@ -150,6 +156,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.meta_fetch_concurrency 13
datafusion.execution.parquet.allow_single_file_parallelism false
datafusion.execution.parquet.bloom_filter_enabled false
datafusion.execution.parquet.bloom_filter_fpp NULL
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system |
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.meta_fetch_concurrency | 0 | Number of files to read in parallel when inferring schema and statistics Defaults to the number of CPU cores on the system |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of 0 seems strange here, but it looks like a bug with the script that makes this page, given the same issue appears there:

| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the default is cpu::num(), so it has no fixed value.

| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
Expand Down