From b6b652560f7fea726cc50632d3f6c957da48b259 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 24 Oct 2024 17:21:59 +0800 Subject: [PATCH 1/2] fix: planning of prepare statement with limit clause --- datafusion/sql/src/query.rs | 10 ++++++---- datafusion/sql/tests/sql_integration.rs | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 842a1c0cbec1..1ef009132f9e 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -53,7 +53,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // so we need to process `SELECT` and `ORDER BY` together. let oby_exprs = to_order_by_exprs(query.order_by)?; let plan = self.select_to_plan(*select, oby_exprs, planner_context)?; - let plan = self.limit(plan, query.offset, query.limit)?; + let plan = + self.limit(plan, query.offset, query.limit, planner_context)?; // Process the `SELECT INTO` after `LIMIT`. self.select_into(plan, select_into) } @@ -68,7 +69,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { None, )?; let plan = self.order_by(plan, order_by_rex)?; - self.limit(plan, query.offset, query.limit) + self.limit(plan, query.offset, query.limit, planner_context) } } } @@ -79,6 +80,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { input: LogicalPlan, skip: Option, fetch: Option, + planner_context: &mut PlannerContext, ) -> Result { if skip.is_none() && fetch.is_none() { return Ok(input); @@ -88,10 +90,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let empty_schema = DFSchema::empty(); let skip = skip - .map(|o| self.sql_to_expr(o.value, &empty_schema, &mut PlannerContext::new())) + .map(|o| self.sql_to_expr(o.value, &empty_schema, planner_context)) .transpose()?; let fetch = fetch - .map(|e| self.sql_to_expr(e, &empty_schema, &mut PlannerContext::new())) + .map(|e| self.sql_to_expr(e, &empty_schema, planner_context)) .transpose()?; LogicalPlanBuilder::from(input) .limit_by_expr(skip, fetch)? diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index edb614493b38..015bb0b7e8f9 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4209,6 +4209,29 @@ fn test_prepare_statement_to_plan_having() { prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan); } +#[test] +fn test_prepare_statement_to_plan_limit() { + let sql = "PREPARE my_plan(BIGINT, BIGINT) AS + SELECT id FROM person \ + LIMIT $1 OFFSET $2"; + + let expected_plan = "Prepare: \"my_plan\" [Int64, Int64] \ + \n Limit: skip=$2, fetch=$1\ + \n Projection: person.id\ + \n TableScan: person"; + + let expected_dt = "[Int64, Int64]"; + + let plan = prepare_stmt_quick_test(sql, expected_plan, expected_dt); + + // replace params with values + let param_values = vec![ScalarValue::Int64(Some(10)), ScalarValue::Int64(Some(200))]; + let expected_plan = "Limit: skip=200, fetch=10\ + \n Projection: person.id\ + \n TableScan: person"; + prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan); +} + #[test] fn test_prepare_statement_to_plan_value_list() { let sql = "PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);"; From 1a13a89b4954ecf1eadb72739431033145af968c Mon Sep 17 00:00:00 2001 From: jonahgao Date: Thu, 24 Oct 2024 17:38:52 +0800 Subject: [PATCH 2/2] Improve test --- datafusion/sql/tests/sql_integration.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 015bb0b7e8f9..698c408e538f 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4213,10 +4213,10 @@ fn test_prepare_statement_to_plan_having() { fn test_prepare_statement_to_plan_limit() { let sql = "PREPARE my_plan(BIGINT, BIGINT) AS SELECT id FROM person \ - LIMIT $1 OFFSET $2"; + OFFSET $1 LIMIT $2"; let expected_plan = "Prepare: \"my_plan\" [Int64, Int64] \ - \n Limit: skip=$2, fetch=$1\ + \n Limit: skip=$1, fetch=$2\ \n Projection: person.id\ \n TableScan: person"; @@ -4226,7 +4226,7 @@ fn test_prepare_statement_to_plan_limit() { // replace params with values let param_values = vec![ScalarValue::Int64(Some(10)), ScalarValue::Int64(Some(200))]; - let expected_plan = "Limit: skip=200, fetch=10\ + let expected_plan = "Limit: skip=10, fetch=200\ \n Projection: person.id\ \n TableScan: person"; prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);