Skip to content

Commit

Permalink
feat(substrait): add wildcard handling to producer (#12987)
Browse files Browse the repository at this point in the history
* feat(substrait): add wildcard expand rule in producer

* add comment describing need for ExpandWildcardRule
  • Loading branch information
tokoko authored Oct 18, 2024
1 parent 97f7491 commit 42f9060
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
10 changes: 9 additions & 1 deletion datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::config::ConfigOptions;
use datafusion::optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use datafusion::optimizer::AnalyzerRule;
use std::sync::Arc;
use substrait::proto::expression_reference::ExprType;

Expand Down Expand Up @@ -103,9 +106,14 @@ pub fn to_substrait_plan(plan: &LogicalPlan, ctx: &SessionContext) -> Result<Box
// Parse relation nodes
// Generate PlanRel(s)
// Note: Only 1 relation tree is currently supported

// We have to expand wildcard expressions first as wildcards can't be represented in substrait
let plan = Arc::new(ExpandWildcardRule::new())
.analyze(plan.clone(), &ConfigOptions::default())?;

let plan_rels = vec![PlanRel {
rel_type: Some(plan_rel::RelType::Root(RelRoot {
input: Some(*to_substrait_rel(plan, ctx, &mut extensions)?),
input: Some(*to_substrait_rel(&plan, ctx, &mut extensions)?),
names: to_substrait_named_struct(plan.schema(), &mut extensions)?.names,
})),
}];
Expand Down
34 changes: 33 additions & 1 deletion datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,13 @@ async fn simple_select() -> Result<()> {

#[tokio::test]
async fn wildcard_select() -> Result<()> {
roundtrip("SELECT * FROM data").await
assert_expected_plan_unoptimized(
"SELECT * FROM data",
"Projection: data.a, data.b, data.c, data.d, data.e, data.f\
\n TableScan: data",
true,
)
.await
}

#[tokio::test]
Expand Down Expand Up @@ -1174,6 +1180,32 @@ async fn verify_post_join_filter_value(proto: Box<Plan>) -> Result<()> {
Ok(())
}

async fn assert_expected_plan_unoptimized(
sql: &str,
expected_plan_str: &str,
assert_schema: bool,
) -> Result<()> {
let ctx = create_context().await?;
let df = ctx.sql(sql).await?;
let plan = df.into_unoptimized_plan();
let proto = to_substrait_plan(&plan, &ctx)?;
let plan2 = from_substrait_plan(&ctx, &proto).await?;

println!("{plan}");
println!("{plan2}");

println!("{proto:?}");

if assert_schema {
assert_eq!(plan.schema(), plan2.schema());
}

let plan2str = format!("{plan2}");
assert_eq!(expected_plan_str, &plan2str);

Ok(())
}

async fn assert_expected_plan(
sql: &str,
expected_plan_str: &str,
Expand Down

0 comments on commit 42f9060

Please sign in to comment.