Skip to content

Commit

Permalink
[minor] make recursive package dependency optional (apache#13778)
Browse files Browse the repository at this point in the history
* make recursive optional

* add to default for common package

* cargo update

* added to readme

* make test conditional

* reviews

* cargo update

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and zhuqi-lucas committed Dec 23, 2024
1 parent 0713016 commit 5f7daca
Show file tree
Hide file tree
Showing 17 changed files with 54 additions and 50 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ Default features:
- `parquet`: support for reading the [Apache Parquet] format
- `regex_expressions`: regular expression functions, such as `regexp_match`
- `unicode_expressions`: Include unicode aware functions such as `character_length`
- `unparser` : enables support to reverse LogicalPlans back into SQL
- `unparser`: enables support to reverse LogicalPlans back into SQL
- `recursive-protection`: uses [recursive](https://docs.rs/recursive/latest/recursive/) for stack overflow protection.

Optional features:

Expand Down
1 change: 0 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ name = "datafusion_common"
path = "src/lib.rs"

[features]
default = ["recursive-protection"]
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
force_hash_collisions = []
recursive-protection = ["dep:recursive"]

[dependencies]
ahash = { workspace = true }
Expand All @@ -62,7 +64,7 @@ object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.22.0", optional = true }
recursive = { workspace = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }

Expand Down
14 changes: 7 additions & 7 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! [`TreeNode`] for visiting and rewriting expression and plan trees
use crate::Result;
use recursive::recursive;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
Expand Down Expand Up @@ -125,7 +124,7 @@ pub trait TreeNode: Sized {
/// TreeNodeVisitor::f_up(ChildNode2)
/// TreeNodeVisitor::f_up(ParentNode)
/// ```
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>(
&'n self,
visitor: &mut V,
Expand Down Expand Up @@ -175,7 +174,7 @@ pub trait TreeNode: Sized {
/// TreeNodeRewriter::f_up(ChildNode2)
/// TreeNodeRewriter::f_up(ParentNode)
/// ```
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn rewrite<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -198,7 +197,7 @@ pub trait TreeNode: Sized {
&'n self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result<TreeNodeRecursion>>(
node: &'n N,
f: &mut F,
Expand Down Expand Up @@ -233,7 +232,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand All @@ -257,7 +256,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_up_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand Down Expand Up @@ -372,7 +371,7 @@ pub trait TreeNode: Sized {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_up_impl<
N: TreeNode,
FD: FnMut(N) -> Result<Transformed<N>>,
Expand Down Expand Up @@ -2350,6 +2349,7 @@ pub(crate) mod tests {
Ok(())
}

#[cfg(feature = "recursive-protection")]
#[test]
fn test_large_tree() {
let mut item = TestTreeNode::new_leaf("initial".to_string());
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ name = "datafusion_expr"
path = "src/lib.rs"

[features]
default = ["recursive-protection"]
recursive-protection = ["dep:recursive"]

[dependencies]
arrow = { workspace = true }
Expand All @@ -48,7 +50,7 @@ datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
indexmap = { workspace = true }
paste = "^1.0"
recursive = { workspace = true }
recursive = { workspace = true, optional = true }
serde_json = { workspace = true }
sqlparser = { workspace = true }

Expand Down
3 changes: 1 addition & 2 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_common::{
TableReference,
};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use recursive::recursive;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -100,7 +99,7 @@ impl ExprSchemable for Expr {
/// expression refers to a column that does not exist in the
/// schema, or when the expression is incorrectly typed
/// (e.g. `[utf8] + [bool]`).
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
match self {
Expr::Alias(Alias { expr, name, .. }) => match &**expr {
Expand Down
13 changes: 6 additions & 7 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::{
UserDefinedLogicalNode, Values, Window,
};
use datafusion_common::tree_node::TreeNodeRefContainer;
use recursive::recursive;

use crate::expr::{Exists, InSubquery};
use datafusion_common::tree_node::{
Expand Down Expand Up @@ -669,7 +668,7 @@ impl LogicalPlan {

/// Visits a plan similarly to [`Self::visit`], including subqueries that
/// may appear in expressions such as `IN (SELECT ...)`.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
&self,
visitor: &mut V,
Expand All @@ -688,7 +687,7 @@ impl LogicalPlan {
/// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -707,7 +706,7 @@ impl LogicalPlan {
&self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn apply_with_subqueries_impl<
F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
>(
Expand Down Expand Up @@ -742,7 +741,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand All @@ -767,7 +766,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_up_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand Down Expand Up @@ -795,7 +794,7 @@ impl LogicalPlan {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_up_with_subqueries_impl<
FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ workspace = true
name = "datafusion_optimizer"
path = "src/lib.rs"

[features]
default = ["recursive-protection"]
recursive-protection = ["dep:recursive"]

[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
Expand All @@ -44,7 +48,7 @@ datafusion-physical-expr = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }
recursive = { workspace = true, optional = true }
regex = { workspace = true }
regex-syntax = "0.8.0"

Expand Down
7 changes: 3 additions & 4 deletions datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use crate::analyzer::check_plan;
use crate::utils::collect_subquery_cols;
use recursive::recursive;

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{plan_err, Result};
Expand Down Expand Up @@ -79,7 +78,7 @@ pub fn check_subquery_expr(
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate {group_expr, aggr_expr,..}) => {
LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. }) => {
if group_expr.contains(expr) && !aggr_expr.contains(expr) {
// TODO revisit this validation logic
plan_err!(
Expand All @@ -88,7 +87,7 @@ pub fn check_subquery_expr(
} else {
Ok(())
}
},
}
_ => plan_err!(
"Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes"
)
Expand Down Expand Up @@ -129,7 +128,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> {
}

// Recursively check the unsupported outer references in the sub query plan.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> {
if !can_contain_outer_ref && inner_plan.contains_outer_reference() {
return plan_err!("Accessing outer reference columns is not allowed in the plan");
Expand Down
5 changes: 2 additions & 3 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;

use crate::optimizer::ApplyOrder;
use crate::utils::NamePreserver;
Expand Down Expand Up @@ -532,7 +531,7 @@ impl OptimizerRule for CommonSubexprEliminate {
None
}

#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down Expand Up @@ -952,7 +951,7 @@ mod test {
)?
.build()?;

let expected ="Aggregate: groupBy=[[]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\
let expected = "Aggregate: groupBy=[[]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\
\n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, test.c\
\n TableScan: test";

Expand Down
21 changes: 10 additions & 11 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::sync::Arc;

use crate::join_key_set::JoinKeySet;
Expand Down Expand Up @@ -80,7 +79,7 @@ impl OptimizerRule for EliminateCrossJoin {
true
}

#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down Expand Up @@ -651,7 +650,7 @@ mod tests {
" Inner Join: t1.a = t2.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Inner Join: t1.a = t3.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"
];

Expand Down Expand Up @@ -1237,10 +1236,10 @@ mod tests {
.build()?;

let expected = vec![
"Filter: t1.a + UInt32(100) = t2.a * UInt32(2) OR t2.b = t1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Cross Join: [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
"Filter: t1.a + UInt32(100) = t2.a * UInt32(2) OR t2.b = t1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Cross Join: [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
];

assert_optimized_plan_eq(plan, expected);
Expand Down Expand Up @@ -1293,10 +1292,10 @@ mod tests {
.build()?;

let expected = vec![
"Filter: t2.c < UInt32(15) OR t2.c = UInt32(688) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
"Filter: t2.c < UInt32(15) OR t2.c = UInt32(688) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
];

assert_optimized_plan_eq(plan, expected);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod required_indices;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -110,7 +109,7 @@ impl OptimizerRule for OptimizeProjections {
/// columns.
/// - `Ok(None)`: Signal that the given logical plan did not require any change.
/// - `Err(error)`: An error occurred during the optimization process.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn optimize_projections(
plan: LogicalPlan,
config: &dyn OptimizerConfig,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ rust-version = { workspace = true }
[lints]
workspace = true

[features]
default = ["recursive-protection"]
recursive-protection = ["dep:recursive"]

[dependencies]
arrow = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
Expand All @@ -40,7 +44,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }
recursive = { workspace = true, optional = true }

[dev-dependencies]
datafusion-expr = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
use datafusion_physical_plan::{expressions, ExecutionPlan};
use recursive::recursive;
use std::sync::Arc;

use crate::PhysicalOptimizerRule;
Expand All @@ -42,7 +41,7 @@ impl AggregateStatistics {
}

impl PhysicalOptimizerRule for AggregateStatistics {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
Expand Down
Loading

0 comments on commit 5f7daca

Please sign in to comment.