Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support prepare statement #4490

Merged
merged 20 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard
| Expr::QualifiedWildcard { .. } => {
| Expr::QualifiedWildcard { .. }
| Expr::Placeholder { .. } => {
*self.is_applicable = false;
Recursion::Stop(self)
}
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
"Create physical name does not support qualified wildcard".to_string(),
)),
Expr::Placeholder { .. } => Err(DataFusionError::Internal(
"Create physical name does not support placeholder".to_string(),
)),
}
}

Expand Down Expand Up @@ -1031,6 +1034,14 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
LogicalPlan::Prepare(_) => {
// There is no default plan for "PREPARE" -- it must be
// handled at a higher level (so that the appropriate
// statement can be prepared)
Err(DataFusionError::Internal(
"Unsupported logical plan: Prepare".to_string(),
))
}
LogicalPlan::CreateCatalogSchema(_) => {
// There is no default plan for "CREATE SCHEMA".
// It must be handled at a higher level (so
Expand Down
40 changes: 40 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/prepare.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

##########
## Prepare Statement Tests
##########

statement ok
create table person (id int, first_name varchar, last_name varchar, age int, state varchar, salary double, birthday timestamp, "😀" int) as values (1, 'jane', 'smith', 20, 'MA', 100000.45, '2000-11-12T00:00:00'::timestamp, 99);

query C rowsort
select * from person;
----
1 jane smith 20 MA 100000.45 2000-11-12T00:00:00.000000000 99

# TODO: support error instead of panicking
# thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SQL(ParserError("Expected AS, found: SELECT"))', datafusion/core/tests/sqllogictests/src/main.rs:197:42
# statement error
# PREPARE AS SELECT id, age FROM person WHERE age = $foo

# TODO: this statement should work after we support EXECUTE statement and caching this prepare logical plan somewhere
# statement ok
# PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);

# And then we may want to add test_prepare_statement* here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we do not store the prepare logical plan anywhere and still throw error when we try to generate physical plan for it, this .stl tests are not that useful yet but they will be valuable after those are done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I can add a subtask to implement it under #4539 -- then in DataFusion we normally leave a link to the ticket as a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All corresponding prepare tests are added here. They will be available for testing (mean many statement error will become statement ok) after we store the prepare logical plan and not throw error

11 changes: 11 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ pub enum Expr {
/// List of grouping set expressions. Only valid in the context of an aggregate
/// GROUP BY expression list
GroupingSet(GroupingSet),
/// A place holder for parameters in a prepared statement
/// (e.g. `$foo` or `$1`)
Placeholder {
/// The identifier of the parameter (e.g, $1 or $foo)
id: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// The type the parameter will be filled in with
data_type: DataType,
},
}

/// Binary expression
Expand Down Expand Up @@ -528,6 +536,7 @@ impl Expr {
Expr::Literal(..) => "Literal",
Expr::Negative(..) => "Negative",
Expr::Not(..) => "Not",
Expr::Placeholder { .. } => "Placeholder",
Expr::QualifiedWildcard { .. } => "QualifiedWildcard",
Expr::ScalarFunction { .. } => "ScalarFunction",
Expr::ScalarSubquery { .. } => "ScalarSubquery",
Expand Down Expand Up @@ -980,6 +989,7 @@ impl fmt::Debug for Expr {
)
}
},
Expr::Placeholder { id, .. } => write!(f, "{}", id),
}
}
}
Expand Down Expand Up @@ -1263,6 +1273,7 @@ fn create_name(e: &Expr) -> Result<String> {
Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
"Create name does not support qualified wildcard".to_string(),
)),
Expr::Placeholder { id, .. } => Ok((*id).to_string()),
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl ExprRewritable for Expr {
key,
))
}
Expr::Placeholder { id, data_type } => Expr::Placeholder { id, data_type },
};

// now rewrite this expression itself
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl ExprSchemable for Expr {
Expr::Like { .. } | Expr::ILike { .. } | Expr::SimilarTo { .. } => {
Ok(DataType::Boolean)
}
Expr::Placeholder { data_type, .. } => Ok(data_type.clone()),
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expand Down Expand Up @@ -198,7 +199,8 @@ impl ExprSchemable for Expr {
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Exists { .. } => Ok(false),
| Expr::Exists { .. }
| Expr::Placeholder { .. } => Ok(true),
Expr::InSubquery { expr, .. } => expr.nullable(input_schema),
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).is_nullable())
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ impl ExprVisitable for Expr {
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. } => Ok(visitor),
| Expr::QualifiedWildcard { .. }
| Expr::Placeholder { .. } => Ok(visitor),
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
let visitor = left.accept(visitor)?;
right.accept(visitor)
Expand Down
16 changes: 13 additions & 3 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use crate::{and, binary_expr, Operator};
use crate::{
logical_plan::{
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection,
Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values,
Window,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan,
Union, Values, Window,
},
utils::{
can_hash, expand_qualified_wildcard, expand_wildcard,
Expand Down Expand Up @@ -118,6 +118,8 @@ impl LogicalPlanBuilder {
/// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
/// The column names are not specified by the SQL standard and different database systems do it differently,
/// so it's usually better to override the default names with a table alias list.
///
/// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
pub fn values(mut values: Vec<Vec<Expr>>) -> Result<Self> {
if values.is_empty() {
return Err(DataFusionError::Plan("Values list cannot be empty".into()));
Expand Down Expand Up @@ -279,6 +281,14 @@ impl LogicalPlanBuilder {
)?)))
}

pub fn prepare(&self, name: String, data_types: Vec<DataType>) -> Result<Self> {
Ok(Self::from(LogicalPlan::Prepare(Prepare {
name,
data_types,
input: Arc::new(self.plan.clone()),
})))
}

/// Limit the number of rows returned
///
/// `skip` - Number of rows to skip before fetch any row.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Prepare, Projection, Repartition,
SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, Values, Window,
};
Expand Down
36 changes: 29 additions & 7 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub enum LogicalPlan {
Distinct(Distinct),
/// Set a Variable
SetVariable(SetVariable),
/// Prepare a statement
Prepare(Prepare),
}

impl LogicalPlan {
Expand All @@ -136,6 +138,7 @@ impl LogicalPlan {
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
Expand Down Expand Up @@ -203,8 +206,9 @@ impl LogicalPlan {
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::Distinct(Distinct { input, .. }) => input.all_schemas(),
| LogicalPlan::Filter(Filter { input, .. })
| LogicalPlan::Distinct(Distinct { input, .. })
| LogicalPlan::Prepare(Prepare { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::SetVariable(_) => vec![],
Expand Down Expand Up @@ -273,7 +277,8 @@ impl LogicalPlan {
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_) => {
| LogicalPlan::Distinct(_)
| LogicalPlan::Prepare(_) => {
vec![]
}
}
Expand Down Expand Up @@ -302,7 +307,8 @@ impl LogicalPlan {
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. }) => {
| LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Prepare(Prepare { input, .. }) => {
vec![input]
}
// plans without inputs
Expand Down Expand Up @@ -450,9 +456,8 @@ impl LogicalPlan {
input.accept(visitor)?
}
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. }) => {
input.accept(visitor)?
}
| LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Prepare(Prepare { input, .. }) => input.accept(visitor)?,
LogicalPlan::Extension(extension) => {
for input in extension.node.inputs() {
if !input.accept(visitor)? {
Expand Down Expand Up @@ -963,6 +968,11 @@ impl LogicalPlan {
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
LogicalPlan::Prepare(Prepare {
name, data_types, ..
}) => {
write!(f, "Prepare: {:?} {:?} ", name, data_types)
}
}
}
}
Expand Down Expand Up @@ -1360,6 +1370,18 @@ pub struct CreateExternalTable {
pub options: HashMap<String, String>,
}

/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
/// `Expr::Placeholder` expressions that are filled in during execution
#[derive(Clone)]
pub struct Prepare {
/// The name of the statement
pub name: String,
/// Data types of the parameters ([`Expr::Placeholder`])
pub data_types: Vec<DataType>,
/// The logical plan of the statements
pub input: Arc<LogicalPlan>,
}

Comment on lines +1393 to +1397
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the data types Vec size is the same with the place holders in the input plan, but is there any check for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data types of the placeholders are from the data types of the Vec here so they match. We do check if the Vec contains enough params, too. However, there are flexibility:

  1. The length of the Vec can be longer than the number of the placeholders which will be fine and we have tests for this.
  2. The data type of the Vec can be anything and we will use them for the placeholders, we do not check if the data types are compatible with the variables in the expression because: (i) we allow data type casting, and (2) when we reach the placeholders, we no longer have the context which variable/column/expression the place holder is used for. We do not want to add more context to backtrack which will cost compile time as well as complicated implementation.

However, I am working on #4550 that convert Prepare Logical Plan to a logical plan with all placeholders replaced with actual values. There, I will throw error if the data types provided do not work. We follow the same behavior of Postgres

/// Produces a relation with string representations of
/// various parts of the plan
#[derive(Clone)]
Expand Down
14 changes: 11 additions & 3 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
use crate::logical_plan::builder::build_join_schema;
use crate::logical_plan::{
Aggregate, Analyze, CreateMemoryTable, CreateView, Distinct, Extension, Filter, Join,
Limit, Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union,
Values, Window,
Limit, Partitioning, Prepare, Projection, Repartition, Sort, Subquery, SubqueryAlias,
Union, Values, Window,
};
use crate::{Cast, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use arrow::datatypes::{DataType, TimeUnit};
Expand Down Expand Up @@ -126,7 +126,8 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::GetIndexedField { .. } => {}
| Expr::GetIndexedField { .. }
| Expr::Placeholder { .. } => {}
}
Ok(Recursion::Continue(self))
}
Expand Down Expand Up @@ -579,6 +580,13 @@ pub fn from_plan(

Ok(plan.clone())
}
LogicalPlan::Prepare(Prepare {
name, data_types, ..
}) => Ok(LogicalPlan::Prepare(Prepare {
name: name.clone(),
data_types: data_types.clone(),
input: Arc::new(inputs[0].clone()),
})),
Comment on lines +586 to +589
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible and allowed here that the method passed in a totally different input plan ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the code we implement, the answer is no unless there are bugs

LogicalPlan::EmptyRelation(_)
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable(_)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::DropView(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension(_) => {
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, optimizer_config)
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ fn optimize_plan(
| LogicalPlan::SetVariable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension { .. } => {
| LogicalPlan::Extension { .. }
| LogicalPlan::Prepare(_) => {
let expr = plan.expressions();
// collect all required columns by this plan
exprlist_to_columns(&expr, &mut new_required_columns)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ impl<'a> ConstEvaluator<'a> {
| Expr::Sort { .. }
| Expr::GroupingSet(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. } => false,
| Expr::QualifiedWildcard { .. }
| Expr::Placeholder { .. } => false,
Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()),
Expr::ScalarUDF { fun, .. } => Self::volatility_ok(fun.signature.volatility),
Expr::Literal(_)
Expand Down
14 changes: 14 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ message LogicalPlanNode {
DistinctNode distinct = 23;
ViewTableScanNode view_scan = 24;
CustomTableScanNode custom_scan = 25;
PrepareNode prepare = 26;
}
}

Expand Down Expand Up @@ -181,6 +182,12 @@ message CreateExternalTableNode {
map<string, string> options = 11;
}

message PrepareNode {
string name = 1;
repeated ArrowType data_types = 2;
LogicalPlanNode input = 3;
}

message CreateCatalogSchemaNode {
string schema_name = 1;
bool if_not_exists = 2;
Expand Down Expand Up @@ -345,9 +352,16 @@ message LogicalExprNode {
ILikeNode ilike = 32;
SimilarToNode similar_to = 33;

PlaceholderNode placeholder = 34;

}
}

message PlaceholderNode {
string id = 1;
ArrowType data_type = 2;
}

message LogicalExprList {
repeated LogicalExprNode expr = 1;
}
Expand Down
Loading