Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure derived table column names are handled correctly #15588

Merged
merged 26 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
caba63d
feat: WSColumn implementation for Projection
systay Mar 28, 2024
6a9c845
feat: first use of AddWSColumn
systay Mar 28, 2024
475c581
feat: support ws columns by offset on Filter and Subquery
systay Mar 28, 2024
be1d09f
feat: AddWSColumn for HashJoin
systay Mar 28, 2024
a77cdc0
feat: add support for LIMIT
systay Mar 28, 2024
6737be6
feat: AddWSColumn for aggregator
systay Mar 28, 2024
d9958e9
feat: make UNION both use and implement WS by offset
systay Mar 28, 2024
e339adc
feat: make ApplyJoin handle WS columns by offset
systay Mar 28, 2024
74de6df
feat: implement AddWSColumn on more pass-through ops
systay Mar 28, 2024
7cb3350
feat: implemented WS by offset on the remaining ops except Route
systay Mar 28, 2024
ed7b629
minor fixes
systay Apr 19, 2024
13af80a
fix: pushing of ORDER BY by recursive deps, since we can split a deri…
systay Apr 9, 2024
e4fb78d
test: nsfw
systay Apr 19, 2024
32cb4ed
chore: minor fixes
systay Apr 22, 2024
d6edec7
feat: make route support AddWSColumn
systay Apr 22, 2024
a228062
refactor: remove AddWSColumn harness
systay Apr 22, 2024
90057f7
refactor: re-use existing method
systay Apr 22, 2024
e851d33
fix: handle UNION column dependencies correctly
systay Apr 23, 2024
3dd0f95
feat: allow for ColNames to not have a direct dependency
systay Apr 23, 2024
c3ce94a
feat: simplify push-down logic
systay Apr 23, 2024
9e76ac6
fix: don't project expressions if not needed
systay Apr 23, 2024
7a9fd3b
feat: only pass through ColNames through derived tables
systay Apr 26, 2024
51c9098
fix: properly expose columns through derived tables
systay Apr 26, 2024
a55df3f
chore: minor cleanups
systay Apr 26, 2024
ed9b527
chore: clean-up of code after review
systay Apr 29, 2024
340956c
Merge remote-tracking branch 'upstream/main' into derived-table-colum…
systay Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,66 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
return offset
}

func (a *Aggregator) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
if len(a.Columns) <= offset {
panic(vterrors.VT13001("offset out of range"))
}

var expr sqlparser.Expr
// first search for the offset among the groupings
for i, by := range a.Grouping {
if by.ColOffset != offset {
continue
}
if by.WSOffset >= 0 {
// ah, we already have a weigh_string for this column. let's return it as is
return by.WSOffset
}

// we need to add a WS column
a.Grouping[i].WSOffset = len(a.Columns)
expr = a.Columns[offset].Expr
break
}

if expr == nil {
for _, aggr := range a.Aggregations {
if aggr.ColOffset != offset {
continue
}
if aggr.WSOffset >= 0 {
// ah, we already have a weigh_string for this column. let's return it as is
return aggr.WSOffset
}

panic(vterrors.VT13001("did not expect to get here"))
systay marked this conversation as resolved.
Show resolved Hide resolved
}
}

if expr == nil {
panic(vterrors.VT13001("could not find group by"))
}

wsExpr := weightStringFor(expr)
wsAe := aeWrap(wsExpr)

wsOffset := len(a.Columns)
a.Columns = append(a.Columns, wsAe)
if underRoute {
// if we are under a route, we are done here.
// the column will be use when creating the query to send to the tablet, and that is all we need
return wsOffset
}

incomingOffset := a.Source.AddWSColumn(ctx, offset, false)

if wsOffset != incomingOffset {
// TODO: we could handle this case by adding a projection on under the aggregator to make the columns line up
panic(errFailedToPlan(wsAe))
}
return wsOffset
}

func (a *Aggregator) findColInternal(ctx *plancontext.PlanningContext, ae *sqlparser.AliasedExpr, addToGroupBy bool) int {
expr := ae.Expr
offset := a.FindCol(ctx, expr, false)
Expand All @@ -211,8 +271,19 @@ func (a *Aggregator) findColInternal(ctx *plancontext.PlanningContext, ae *sqlpa
return -1
}

func isDerived(op Operator) bool {
switch op := op.(type) {
case *Horizon:
return op.IsDerived()
case selectExpressions:
return op.derivedName() != ""
default:
return false
}
}

func (a *Aggregator) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
if _, isSourceDerived := a.Source.(*Horizon); isSourceDerived {
if isDerived(a.Source) {
return a.Columns
}

Expand Down Expand Up @@ -278,6 +349,7 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) Operator {
if gb.ColOffset == -1 {
offset := a.internalAddColumn(ctx, aeWrap(gb.Inner), false)
a.Grouping[idx].ColOffset = offset
gb.ColOffset = offset
}
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.Inner) {
continue
Expand Down
72 changes: 59 additions & 13 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,52 @@ func (aj *ApplyJoin) AddColumn(
return offset
}

func (aj *ApplyJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
if len(aj.Columns) == 0 {
aj.planOffsets(ctx)
}

if len(aj.Columns) <= offset {
panic(vterrors.VT13001("offset out of range"))
}

wsExpr := weightStringFor(aj.JoinColumns.columns[offset].Original)
if index := aj.FindCol(ctx, wsExpr, false); index != -1 {
// nice, we already have this column. no need to add anything
return index
}

i := aj.Columns[offset]
out := 0
if i < 0 {
out = aj.LHS.AddWSColumn(ctx, FromLeftOffset(i), underRoute)
out = ToLeftOffset(out)
aj.JoinColumns.addLeft(wsExpr)
} else {
out = aj.RHS.AddWSColumn(ctx, FromRightOffset(i), underRoute)
out = ToRightOffset(out)
aj.JoinColumns.addRight(wsExpr)
}

if out >= 0 {
aj.addOffset(out)
} else {
col := aj.getJoinColumnFor(ctx, aeWrap(wsExpr), wsExpr, !ContainsAggr(ctx, wsExpr))
aj.JoinColumns.add(col)
aj.planOffsetFor(ctx, col)
}

return len(aj.Columns) - 1
}

func (aj *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
if len(aj.Columns) > 0 {
// we've already done offset planning
return aj
}
for _, col := range aj.JoinColumns.columns {
// Read the type description for applyJoinColumn to understand the following code
for _, lhsExpr := range col.LHSExprs {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(lhsExpr.Expr))
if col.RHSExpr == nil {
// if we don't have an RHS expr, it means that this is a pure LHS expression
aj.addOffset(-offset - 1)
} else {
aj.Vars[lhsExpr.Name] = offset
}
}
if col.RHSExpr != nil {
offset := aj.RHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.RHSExpr))
aj.addOffset(offset + 1)
}
aj.planOffsetFor(ctx, col)
}

for _, col := range aj.JoinPredicates.columns {
Expand All @@ -270,6 +300,22 @@ func (aj *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
return nil
}

func (aj *ApplyJoin) planOffsetFor(ctx *plancontext.PlanningContext, col applyJoinColumn) {
for _, lhsExpr := range col.LHSExprs {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(lhsExpr.Expr))
if col.RHSExpr == nil {
// if we don't have an RHS expr, it means that this is a pure LHS expression
aj.addOffset(ToLeftOffset(offset))
} else {
aj.Vars[lhsExpr.Name] = offset
}
}
if col.RHSExpr != nil {
offset := aj.RHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.RHSExpr))
aj.addOffset(ToRightOffset(offset))
}
}

func (aj *ApplyJoin) addOffset(offset int) {
aj.Columns = append(aj.Columns, offset)
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (l *LockAndComment) AddColumn(ctx *plancontext.PlanningContext, reuseExisti
return l.Source.AddColumn(ctx, reuseExisting, addToGroupBy, expr)
}

func (l *LockAndComment) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return l.Source.AddWSColumn(ctx, offset, underRoute)
}

func (l *LockAndComment) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return l.Source.FindCol(ctx, expr, underRoute)
}
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
for idx, col := range columns {
e := col.Expr
var wsCol *int
typ, _ := ctx.SemTable.TypeForExpr(e)

if ctx.SemTable.NeedsWeightString(e) {
offset := d.Source.AddColumn(ctx, true, false, aeWrap(weightStringFor(e)))
offset := d.Source.AddWSColumn(ctx, idx, false)
wsCol = &offset
}

typ, _ := ctx.SemTable.TypeForExpr(e)
d.Columns = append(d.Columns, engine.CheckCol{
Col: idx,
WsCol: wsCol,
Expand Down Expand Up @@ -94,6 +92,9 @@ func (d *Distinct) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser
func (d *Distinct) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, expr *sqlparser.AliasedExpr) int {
return d.Source.AddColumn(ctx, reuse, gb, expr)
}
func (d *Distinct) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return d.Source.AddWSColumn(ctx, offset, underRoute)
}

func (d *Distinct) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return d.Source.FindCol(ctx, expr, underRoute)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (f *Filter) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr,
return f.Source.FindCol(ctx, expr, underRoute)
}

func (f *Filter) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return f.Source.AddWSColumn(ctx, offset, underRoute)
}

func (f *Filter) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return f.Source.GetColumns(ctx)
}
Expand Down
42 changes: 42 additions & 0 deletions go/vt/vtgate/planbuilder/operators/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ func (hj *HashJoin) AddColumn(ctx *plancontext.PlanningContext, reuseExisting bo
return len(hj.columns.columns) - 1
}

func (hj *HashJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
hj.planOffsets(ctx)

if len(hj.ColumnOffsets) <= offset {
panic(vterrors.VT13001("offset out of range"))
}

// check if it already exists
wsExpr := weightStringFor(hj.columns.columns[offset].expr)
if index := hj.FindCol(ctx, wsExpr, false); index != -1 {
return index
}

i := hj.ColumnOffsets[offset]
out := 0
if i < 0 {
out = hj.LHS.AddWSColumn(ctx, FromLeftOffset(i), underRoute)
out = ToLeftOffset(out)
} else {
out = hj.RHS.AddWSColumn(ctx, FromRightOffset(i), underRoute)
out = ToRightOffset(out)
}
hj.ColumnOffsets = append(hj.ColumnOffsets, out)
return len(hj.ColumnOffsets) - 1
}

func (hj *HashJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
if hj.offset {
return nil
Expand Down Expand Up @@ -450,3 +476,19 @@ func (hj *HashJoin) addSingleSidedColumn(
Info: &EvalEngine{EExpr: eexpr},
}, isPureOffset
}

func FromLeftOffset(i int) int {
return -i - 1
}

func ToLeftOffset(i int) int {
return -i - 1
}

func FromRightOffset(i int) int {
return i - 1
}

func ToRightOffset(i int) int {
return i + 1
}
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (h *Horizon) AddColumn(ctx *plancontext.PlanningContext, reuse bool, _ bool
return offset
}

func (h *Horizon) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
panic(errNoNewColumns)
}

var errNoNewColumns = vterrors.VT13001("can't add new columns to Horizon")

// canReuseColumn is generic, so it can be used with slices of different types.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (l *Limit) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool,
return l.Source.AddColumn(ctx, reuse, gb, expr)
}

func (l *Limit) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return l.Source.AddWSColumn(ctx, offset, underRoute)
}

func (l *Limit) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return l.Source.FindCol(ctx, expr, underRoute)
}
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package operators
import (
"fmt"

"vitess.io/vitess/go/vt/vtgate/engine/opcode"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/planbuilder/operators/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type (

AddColumn(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy bool, expr *sqlparser.AliasedExpr) int

// AddWSColumn is used to add a weight_string column to the operator
AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int

FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int

GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (o *Ordering) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bo
return o.Source.AddColumn(ctx, reuse, gb, expr)
}

func (o *Ordering) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return o.Source.AddWSColumn(ctx, offset, underRoute)
}

func (o *Ordering) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return o.Source.FindCol(ctx, expr, underRoute)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p Phase) String() string {
case initialPlanning:
return "initial horizon planning optimization"
case pullDistinctFromUnion:
return "pull distinct from UNION1"
return "pull distinct from UNION"
case delegateAggregation:
return "split aggregation between vtgate and mysql"
case addAggrOrdering:
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/plan_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (noColumns) AddColumn(*plancontext.PlanningContext, bool, bool, *sqlparser.
panic(vterrors.VT13001("noColumns operators have no column"))
}

func (noColumns) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
panic(vterrors.VT13001("noColumns operators have no column"))
}

func (noColumns) GetColumns(*plancontext.PlanningContext) []*sqlparser.AliasedExpr {
panic(vterrors.VT13001("noColumns operators have no column"))
}
Expand Down
Loading
Loading