Skip to content

Commit

Permalink
planner: Handle ORDER BY inside derived tables (#16353)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
Signed-off-by: Andres Taylor <[email protected]>
Co-authored-by: Manan Gupta <[email protected]>
  • Loading branch information
systay and GuptaManan100 authored Jul 9, 2024
1 parent 0a7b973 commit eb29999
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 75 deletions.
16 changes: 13 additions & 3 deletions go/test/endtoend/vtgate/vitess_tester/aggregation/aggregation.test
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE TABLE `t1`

CREATE TABLE `t2`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`t1_id` int unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
Expand Down Expand Up @@ -38,7 +38,10 @@ values (1, 1),

insert into t3 (id, name)
values (1, 'A'),
(2, 'B');
(2, 'B'),
(3, 'B'),
(4, 'B'),
(5, 'B');

-- wait_authoritative t1
-- wait_authoritative t2
Expand All @@ -47,4 +50,11 @@ select group_concat(t3.name SEPARATOR ', ') as "Group Name"
from t1
join t2 on t1.id = t2.t1_id
left join t3 on t1.id = t3.id
group by t1.id;
group by t1.id;

select COUNT(*)
from (select 1 as one
FROM `t3`
WHERE `t3`.`name` = 'B'
ORDER BY id DESC LIMIT 25
OFFSET 0) subquery_for_count;
8 changes: 0 additions & 8 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,6 @@ func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e
return offset
}

func (a *Aggregator) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, groupby []bool, exprs []*sqlparser.AliasedExpr) (offsets []int) {
for i, ae := range exprs {
offset := a.addColumnWithoutPushing(ctx, ae, groupby[i])
offsets = append(offsets, offset)
}
return
}

func (a *Aggregator) isDerived() bool {
return a.DT != nil
}
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,18 @@ func expandUnionHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, unio
}

func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel *sqlparser.Select) (Operator, *ApplyResult) {
op := createProjectionFromSelect(ctx, horizon)
qp := horizon.getQP(ctx)
var extracted []string

if horizon.IsDerived() {
// if we are dealing with a derived table, we need to make sure that the ordering columns
// are available outside the derived table
for _, order := range horizon.Query.GetOrderBy() {
qp.addColumn(ctx, order.Expr)
}
}

op := createProjectionFromSelect(ctx, horizon)
if qp.HasAggr {
extracted = append(extracted, "Aggregation")
} else {
Expand Down
9 changes: 0 additions & 9 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,6 @@ func (p *Projection) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e
return p.addColumn(ctx, true, false, expr, false)
}

func (p *Projection) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, _ []bool, exprs []*sqlparser.AliasedExpr) []int {
offsets := make([]int, len(exprs))
for idx, expr := range exprs {
offset := p.addColumn(ctx, reuse, false, expr, false)
offsets[idx] = offset
}
return offsets
}

func (p *Projection) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
cols, aliased := p.Columns.(AliasedProjections)
if !aliased {
Expand Down
17 changes: 17 additions & 0 deletions go/vt/vtgate/planbuilder/operators/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,23 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont
return true
}

// addColumn adds a column to the QueryProjection if it is not already present
func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
for _, selectExpr := range qp.SelectExprs {
getExpr, err := selectExpr.GetExpr()
if err != nil {
continue
}
if ctx.SemTable.EqualsExprWithDeps(getExpr, expr) {
return
}
}
qp.SelectExprs = append(qp.SelectExprs, SelectExpr{
Col: aeWrap(expr),
Aggr: ctx.ContainsAggr(expr),
})
}

func checkForInvalidGroupingExpressions(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
if ctx.IsAggr(node) {
Expand Down
100 changes: 46 additions & 54 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,96 +587,87 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool,

// if at least one column is not already present, we check if we can easily find a projection
// or aggregation in our source that we can add to
derived, op, ok, offsets := addMultipleColumnsToInput(ctx, r.Source, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr})
r.Source = op
if ok {
return offsets[0]
derived, op, offset := addColumnToInput(ctx, r.Source, expr, reuse, gb)
if op != nil {
r.Source = op
}
if offset >= 0 {
return offset
}

// If no-one could be found, we probably don't have one yet, so we add one here
src := createProjection(ctx, r.Source, derived)
r.Source = src

offsets = src.addColumnsWithoutPushing(ctx, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr})
return offsets[0]
return src.addColumnWithoutPushing(ctx, expr, gb)
}

type selectExpressions interface {
Operator
addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int
addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) []int
derivedName() string
}

// addColumnToInput adds columns to an operator without pushing them down
func addMultipleColumnsToInput(
func addColumnToInput(
ctx *plancontext.PlanningContext,
operator Operator,
reuse bool,
addToGroupBy []bool,
exprs []*sqlparser.AliasedExpr,
) (derivedName string, // if we found a derived table, this will contain its name
expr *sqlparser.AliasedExpr,
reuse, addToGroupBy bool,
) (
derivedName string, // if we found a derived table, this will contain its name
projection Operator, // if an operator needed to be built, it will be returned here
found bool, // whether a matching op was found or not
offsets []int, // the offsets the expressions received
offset int, // the offset of the expression, -1 if not found
) {
var src Operator
var updateSrc func(Operator)
switch op := operator.(type) {
case *SubQuery:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Outer, reuse, addToGroupBy, exprs)
if added {
op.Outer = src
}
return derivedName, op, added, offset

// Pass through operators - we can just add the columns to their source
case *SubQuery:
src, updateSrc = op.Outer, func(newSrc Operator) { op.Outer = newSrc }
case *Distinct:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *Limit:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *Ordering:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *LockAndComment:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }

// Union needs special handling, we can't really add new columns to all inputs
case *Union:
proj := wrapInDerivedProjection(ctx, op)
dtName, newOp, offset := addColumnToInput(ctx, proj, expr, reuse, addToGroupBy)
if newOp == nil {
newOp = proj
}
return derivedName, op, added, offset
return dtName, newOp, offset

// Horizon is another one of these - we can't really add new columns to it
case *Horizon:
// if the horizon has an alias, then it is a derived table,
// we have to add a new projection and can't build on this one
return op.Alias, op, false, nil
return op.Alias, nil, -1

case selectExpressions:
name := op.derivedName()
if name != "" {
// if the only thing we can push to is a derived table,
// we have to add a new projection and can't build on this one
return name, op, false, nil
return name, nil, -1
}
offset := op.addColumnsWithoutPushing(ctx, reuse, addToGroupBy, exprs)
return "", op, true, offset
offset := op.addColumnWithoutPushing(ctx, expr, addToGroupBy)
return "", nil, offset

case *Union:
proj := addDerivedProj(ctx, op)
return addMultipleColumnsToInput(ctx, proj, reuse, addToGroupBy, exprs)
default:
return "", op, false, nil
return "", nil, -1
}

// Handle the case where we have a pass-through operator
derivedName, src, offset = addColumnToInput(ctx, src, expr, reuse, addToGroupBy)
if src != nil {
updateSrc(src)
}
return derivedName, nil, offset
}

func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool) int {
Expand All @@ -691,7 +682,7 @@ func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool

ok, foundOffset := addWSColumnToInput(ctx, r.Source, offset)
if !ok {
src := addDerivedProj(ctx, r.Source)
src := wrapInDerivedProjection(ctx, r.Source)
r.Source = src
return src.AddWSColumn(ctx, offset, true)
}
Expand All @@ -714,7 +705,8 @@ func addWSColumnToInput(ctx *plancontext.PlanningContext, source Operator, offse
return false, -1
}

func addDerivedProj(
// wrapInDerivedProjection wraps the input in a derived table projection named "dt"
func wrapInDerivedProjection(
ctx *plancontext.PlanningContext,
op Operator,
) (projection *Projection) {
Expand Down
43 changes: 43 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -7144,6 +7144,49 @@
]
}
},
{
"comment": "Aggregation over a ORDER BY/LIMIT inside a derived table",
"query": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count",
"plan": {
"QueryType": "SELECT",
"Original": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "count_star(0) AS count(*)",
"Inputs": [
{
"OperatorType": "SimpleProjection",
"Columns": "2",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "25",
"Offset": "0",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1",
"OrderBy": "(1|3) DESC",
"Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit 25",
"Table": "`user`"
}
]
}
]
}
]
},
"TablesUsed": [
"user.user"
]
}
},
{
"comment": "should be able to push down aggregation",
"query": "select sum(user.type) from user join user_extra on user.team_id = user_extra.id group by user_extra.id order by user_extra.id",
Expand Down

0 comments on commit eb29999

Please sign in to comment.