Skip to content

Commit

Permalink
Improve documentation about ParquetExec / Parquet predicate pushdown (
Browse files Browse the repository at this point in the history
#11994)

* Minor: improve ParquetExec docs

* typo

* clippy

* fix whitespace so rustdoc does not treat as tests

* Apply suggestions from code review

Co-authored-by: Oleks V <[email protected]>

* expound upon column rewriting in the context of schema evolution

---------

Co-authored-by: Oleks V <[email protected]>
  • Loading branch information
alamb and comphead authored Aug 16, 2024
1 parent dc84fa5 commit 2a16704
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 65 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ pub trait TreeNodeVisitor<'n>: Sized {
/// A [Visitor](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively
/// rewriting [`TreeNode`]s via [`TreeNode::rewrite`].
///
/// For example you can implement this trait on a struct to rewrite `Expr` or
/// `LogicalPlan` that needs to track state during the rewrite.
///
/// See [`TreeNode`] for more details on available APIs
///
/// When passed to [`TreeNode::rewrite`], [`TreeNodeRewriter::f_down`] and
Expand Down
60 changes: 38 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,12 @@ pub use writer::plan_to_parquet;
///
/// Supports the following optimizations:
///
/// * Concurrent reads: Can read from one or more files in parallel as multiple
/// * Concurrent reads: reads from one or more files in parallel as multiple
/// partitions, including concurrently reading multiple row groups from a single
/// file.
///
/// * Predicate push down: skips row groups and pages based on
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
/// * Predicate push down: skips row groups, pages, rows based on metadata
/// and late materialization. See "Predicate Pushdown" below.
///
/// * Projection pushdown: reads and decodes only the columns required.
///
Expand All @@ -132,9 +131,8 @@ pub use writer::plan_to_parquet;
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
/// details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
/// * Schema evolution: read parquet files with different schemas into a unified
/// table schema. See [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
Expand All @@ -144,6 +142,29 @@ pub use writer::plan_to_parquet;
/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages
/// based on external information. See "Implementing External Indexes" below
///
/// # Predicate Pushdown
///
/// `ParquetExec` uses the provided [`PhysicalExpr`] predicate as a filter to
/// skip reading unnecessary data and improve query performance using several techniques:
///
/// * Row group pruning: skips entire row groups based on min/max statistics
/// found in [`ParquetMetaData`] and any Bloom filters that are present.
///
/// * Page pruning: skips individual pages within a ColumnChunk using the
/// [Parquet PageIndex], if present.
///
/// * Row filtering: skips rows within a page using a form of late
/// materialization. When possible, predicates are applied by the parquet
/// decoder *during* decode (see [`ArrowPredicate`] and [`RowFilter`] for more
/// details). This is only enabled if `ParquetScanOptions::pushdown_filters` is set to true.
///
/// Note: If the predicate can not be used to accelerate the scan, it is ignored
/// (no error is raised on predicate evaluation errors).
///
/// [`ArrowPredicate`]: parquet::arrow::arrow_reader::ArrowPredicate
/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
///
/// # Implementing External Indexes
///
/// It is possible to restrict the row groups and selections within those row
Expand Down Expand Up @@ -199,10 +220,11 @@ pub use writer::plan_to_parquet;
/// applying predicates to metadata. The plan and projections are used to
/// determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
/// * Step 4: The stream begins reading data, fetching the required parquet
/// pages incrementally decoding them, and applying any row filters (see
/// [`Self::with_pushdown_filters`]).
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
Expand Down Expand Up @@ -268,13 +290,10 @@ impl ParquetExecBuilder {
}
}

/// Set the predicate for the scan.
///
/// The ParquetExec uses this predicate to filter row groups and data pages
/// using the Parquet statistics and bloom filters.
/// Set the filter predicate when reading.
///
/// If the predicate can not be used to prune the scan, it is ignored (no
/// error is raised).
/// See the "Predicate Pushdown" section of the [`ParquetExec`] documenation
/// for more details.
pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
self.predicate = Some(predicate);
self
Expand All @@ -291,7 +310,7 @@ impl ParquetExecBuilder {
self
}

/// Set the table parquet options that control how the ParquetExec reads.
/// Set the options for controlling how the ParquetExec reads parquet files.
///
/// See also [`Self::new_with_options`]
pub fn with_table_parquet_options(
Expand Down Expand Up @@ -480,11 +499,8 @@ impl ParquetExec {
self
}

/// If true, any filter [`Expr`]s on the scan will converted to a
/// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
/// `ParquetRecordBatchStream`. These filters are applied by the
/// parquet decoder to skip unecessairly decoding other columns
/// which would not pass the predicate. Defaults to false
/// If true, the predicate will be used during the parquet scan.
/// Defaults to false
///
/// [`Expr`]: datafusion_expr::Expr
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
Expand Down
Loading

0 comments on commit 2a16704

Please sign in to comment.