Skip to content

Commit

Permalink
Make sure derived table column names are handled correctly (#15588)
Browse files Browse the repository at this point in the history
  • Loading branch information
systay authored Apr 29, 2024
1 parent ca2659d commit 11c8d3e
Show file tree
Hide file tree
Showing 36 changed files with 1,280 additions and 488 deletions.
72 changes: 71 additions & 1 deletion go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,64 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
return offset
}

func (a *Aggregator) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
if len(a.Columns) <= offset {
panic(vterrors.VT13001("offset out of range"))
}

var expr sqlparser.Expr
// first search for the offset among the groupings
for i, by := range a.Grouping {
if by.ColOffset != offset {
continue
}
if by.WSOffset >= 0 {
// ah, we already have a weigh_string for this column. let's return it as is
return by.WSOffset
}

// we need to add a WS column
a.Grouping[i].WSOffset = len(a.Columns)
expr = a.Columns[offset].Expr
break
}

if expr == nil {
for _, aggr := range a.Aggregations {
if aggr.ColOffset != offset {
continue
}
if aggr.WSOffset >= 0 {
// ah, we already have a weigh_string for this column. let's return it as is
return aggr.WSOffset
}

panic(vterrors.VT13001("expected to find a weight string for aggregation"))
}

panic(vterrors.VT13001("could not find expression at offset"))
}

wsExpr := weightStringFor(expr)
wsAe := aeWrap(wsExpr)

wsOffset := len(a.Columns)
a.Columns = append(a.Columns, wsAe)
if underRoute {
// if we are under a route, we are done here.
// the column will be use when creating the query to send to the tablet, and that is all we need
return wsOffset
}

incomingOffset := a.Source.AddWSColumn(ctx, offset, false)

if wsOffset != incomingOffset {
// TODO: we could handle this case by adding a projection on under the aggregator to make the columns line up
panic(errFailedToPlan(wsAe))
}
return wsOffset
}

func (a *Aggregator) findColInternal(ctx *plancontext.PlanningContext, ae *sqlparser.AliasedExpr, addToGroupBy bool) int {
expr := ae.Expr
offset := a.FindCol(ctx, expr, false)
Expand All @@ -211,8 +269,19 @@ func (a *Aggregator) findColInternal(ctx *plancontext.PlanningContext, ae *sqlpa
return -1
}

func isDerived(op Operator) bool {
switch op := op.(type) {
case *Horizon:
return op.IsDerived()
case selectExpressions:
return op.derivedName() != ""
default:
return false
}
}

func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
if _, isSourceDerived := a.Source.(*Horizon); isSourceDerived {
if isDerived(a.Source) {
return a.Columns
}

Expand Down Expand Up @@ -278,6 +347,7 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) Operator {
if gb.ColOffset == -1 {
offset := a.internalAddColumn(ctx, aeWrap(gb.Inner), false)
a.Grouping[idx].ColOffset = offset
gb.ColOffset = offset
}
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.Inner) {
continue
Expand Down
72 changes: 59 additions & 13 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,52 @@ func (aj *ApplyJoin) AddColumn(
return offset
}

func (aj *ApplyJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
if len(aj.Columns) == 0 {
aj.planOffsets(ctx)
}

if len(aj.Columns) <= offset {
panic(vterrors.VT13001("offset out of range"))
}

wsExpr := weightStringFor(aj.JoinColumns.columns[offset].Original)
if index := aj.FindCol(ctx, wsExpr, false); index != -1 {
// nice, we already have this column. no need to add anything
return index
}

i := aj.Columns[offset]
out := 0
if i < 0 {
out = aj.LHS.AddWSColumn(ctx, FromLeftOffset(i), underRoute)
out = ToLeftOffset(out)
aj.JoinColumns.addLeft(wsExpr)
} else {
out = aj.RHS.AddWSColumn(ctx, FromRightOffset(i), underRoute)
out = ToRightOffset(out)
aj.JoinColumns.addRight(wsExpr)
}

if out >= 0 {
aj.addOffset(out)
} else {
col := aj.getJoinColumnFor(ctx, aeWrap(wsExpr), wsExpr, !ContainsAggr(ctx, wsExpr))
aj.JoinColumns.add(col)
aj.planOffsetFor(ctx, col)
}

return len(aj.Columns) - 1
}

func (aj *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
if len(aj.Columns) > 0 {
// we've already done offset planning
return aj
}
for _, col := range aj.JoinColumns.columns {
// Read the type description for applyJoinColumn to understand the following code
for _, lhsExpr := range col.LHSExprs {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(lhsExpr.Expr))
if col.RHSExpr == nil {
// if we don't have an RHS expr, it means that this is a pure LHS expression
aj.addOffset(-offset - 1)
} else {
aj.Vars[lhsExpr.Name] = offset
}
}
if col.RHSExpr != nil {
offset := aj.RHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.RHSExpr))
aj.addOffset(offset + 1)
}
aj.planOffsetFor(ctx, col)
}

for _, col := range aj.JoinPredicates.columns {
Expand All @@ -270,6 +300,22 @@ func (aj *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
return nil
}

func (aj *ApplyJoin) planOffsetFor(ctx *plancontext.PlanningContext, col applyJoinColumn) {
for _, lhsExpr := range col.LHSExprs {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(lhsExpr.Expr))
if col.RHSExpr == nil {
// if we don't have an RHS expr, it means that this is a pure LHS expression
aj.addOffset(ToLeftOffset(offset))
} else {
aj.Vars[lhsExpr.Name] = offset
}
}
if col.RHSExpr != nil {
offset := aj.RHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.RHSExpr))
aj.addOffset(ToRightOffset(offset))
}
}

func (aj *ApplyJoin) addOffset(offset int) {
aj.Columns = append(aj.Columns, offset)
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (l *LockAndComment) AddColumn(ctx *plancontext.PlanningContext, reuseExisti
return l.Source.AddColumn(ctx, reuseExisting, addToGroupBy, expr)
}

func (l *LockAndComment) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return l.Source.AddWSColumn(ctx, offset, underRoute)
}

func (l *LockAndComment) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return l.Source.FindCol(ctx, expr, underRoute)
}
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
for idx, col := range columns {
e := col.Expr
var wsCol *int
typ, _ := ctx.SemTable.TypeForExpr(e)

if ctx.SemTable.NeedsWeightString(e) {
offset := d.Source.AddColumn(ctx, true, false, aeWrap(weightStringFor(e)))
offset := d.Source.AddWSColumn(ctx, idx, false)
wsCol = &offset
}

typ, _ := ctx.SemTable.TypeForExpr(e)
d.Columns = append(d.Columns, engine.CheckCol{
Col: idx,
WsCol: wsCol,
Expand Down Expand Up @@ -94,6 +92,9 @@ func (d *Distinct) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser
func (d *Distinct) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, expr *sqlparser.AliasedExpr) int {
return d.Source.AddColumn(ctx, reuse, gb, expr)
}
func (d *Distinct) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return d.Source.AddWSColumn(ctx, offset, underRoute)
}

func (d *Distinct) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return d.Source.FindCol(ctx, expr, underRoute)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (f *Filter) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr,
return f.Source.FindCol(ctx, expr, underRoute)
}

func (f *Filter) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return f.Source.AddWSColumn(ctx, offset, underRoute)
}

func (f *Filter) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return f.Source.GetColumns(ctx)
}
Expand Down
42 changes: 42 additions & 0 deletions go/vt/vtgate/planbuilder/operators/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ func (hj *HashJoin) AddColumn(ctx *plancontext.PlanningContext, reuseExisting bo
return len(hj.columns.columns) - 1
}

func (hj *HashJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
hj.planOffsets(ctx)

if len(hj.ColumnOffsets) <= offset {
panic(vterrors.VT13001("offset out of range"))
}

// check if it already exists
wsExpr := weightStringFor(hj.columns.columns[offset].expr)
if index := hj.FindCol(ctx, wsExpr, false); index != -1 {
return index
}

i := hj.ColumnOffsets[offset]
out := 0
if i < 0 {
out = hj.LHS.AddWSColumn(ctx, FromLeftOffset(i), underRoute)
out = ToLeftOffset(out)
} else {
out = hj.RHS.AddWSColumn(ctx, FromRightOffset(i), underRoute)
out = ToRightOffset(out)
}
hj.ColumnOffsets = append(hj.ColumnOffsets, out)
return len(hj.ColumnOffsets) - 1
}

func (hj *HashJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
if hj.offset {
return nil
Expand Down Expand Up @@ -450,3 +476,19 @@ func (hj *HashJoin) addSingleSidedColumn(
Info: &EvalEngine{EExpr: eexpr},
}, isPureOffset
}

func FromLeftOffset(i int) int {
return -i - 1
}

func ToLeftOffset(i int) int {
return -i - 1
}

func FromRightOffset(i int) int {
return i - 1
}

func ToRightOffset(i int) int {
return i + 1
}
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (h *Horizon) AddColumn(ctx *plancontext.PlanningContext, reuse bool, _ bool
return offset
}

func (h *Horizon) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
panic(errNoNewColumns)
}

var errNoNewColumns = vterrors.VT13001("can't add new columns to Horizon")

// canReuseColumn is generic, so it can be used with slices of different types.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (l *Limit) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool,
return l.Source.AddColumn(ctx, reuse, gb, expr)
}

func (l *Limit) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return l.Source.AddWSColumn(ctx, offset, underRoute)
}

func (l *Limit) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return l.Source.FindCol(ctx, expr, underRoute)
}
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package operators
import (
"fmt"

"vitess.io/vitess/go/vt/vtgate/engine/opcode"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/planbuilder/operators/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type (

AddColumn(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy bool, expr *sqlparser.AliasedExpr) int

// AddWSColumn is used to add a weight_string column to the operator
AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int

FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int

GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (o *Ordering) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bo
return o.Source.AddColumn(ctx, reuse, gb, expr)
}

func (o *Ordering) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return o.Source.AddWSColumn(ctx, offset, underRoute)
}

func (o *Ordering) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return o.Source.FindCol(ctx, expr, underRoute)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p Phase) String() string {
case initialPlanning:
return "initial horizon planning optimization"
case pullDistinctFromUnion:
return "pull distinct from UNION1"
return "pull distinct from UNION"
case delegateAggregation:
return "split aggregation between vtgate and mysql"
case addAggrOrdering:
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/plan_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (noColumns) AddColumn(*plancontext.PlanningContext, bool, bool, *sqlparser.
panic(vterrors.VT13001("noColumns operators have no column"))
}

func (noColumns) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
panic(vterrors.VT13001("noColumns operators have no column"))
}

func (noColumns) GetColumns(*plancontext.PlanningContext) []*sqlparser.AliasedExpr {
panic(vterrors.VT13001("noColumns operators have no column"))
}
Expand Down
34 changes: 34 additions & 0 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,40 @@ func (p *Projection) addColumnsWithoutPushing(ctx *plancontext.PlanningContext,
return offsets
}

func (p *Projection) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
cols, aliased := p.Columns.(AliasedProjections)
if !aliased {
panic(vterrors.VT09015())
}

if offset >= len(cols) || offset < 0 {
panic(vterrors.VT13001(fmt.Sprintf("offset [%d] out of range [%d]", offset, len(cols))))
}

expr := cols[offset].EvalExpr
ws := weightStringFor(expr)
if offset := p.FindCol(ctx, ws, underRoute); offset >= 0 {
// if we already have this column, we can just return the offset
return offset
}

aeWs := aeWrap(ws)
pe := newProjExprWithInner(aeWs, ws)
if underRoute {
return p.addProjExpr(pe)
}

// we need to push down this column to our input
offsetOnInput := p.Source.FindCol(ctx, expr, false)
if offsetOnInput >= 0 {
// if we are not getting this from the source, we can solve this at offset planning time
inputOffset := p.Source.AddWSColumn(ctx, offsetOnInput, false)
pe.Info = Offset(inputOffset)
}

return p.addProjExpr(pe)
}

func (p *Projection) AddColumn(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy bool, ae *sqlparser.AliasedExpr) int {
return p.addColumn(ctx, reuse, addToGroupBy, ae, true)
}
Expand Down
Loading

0 comments on commit 11c8d3e

Please sign in to comment.