Skip to content

Commit

Permalink
Optimize Operator Input Handling (#16689)
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay authored Sep 3, 2024
1 parent 44e96c2 commit d276007
Show file tree
Hide file tree
Showing 48 changed files with 531 additions and 675 deletions.
12 changes: 6 additions & 6 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func transformToPrimitive(ctx *plancontext.PlanningContext, op operators.Operato
}

func transformPercentBasedMirror(ctx *plancontext.PlanningContext, op *operators.PercentBasedMirror) (engine.Primitive, error) {
primitive, err := transformToPrimitive(ctx, op.Operator)
primitive, err := transformToPrimitive(ctx, op.Operator())
if err != nil {
return nil, err
}

target, err := transformToPrimitive(ctx.UseMirror(), op.Target)
target, err := transformToPrimitive(ctx.UseMirror(), op.Target())
// Mirroring is best-effort. If we encounter an error while building the
// mirror target primitive, proceed without mirroring.
if err != nil {
Expand Down Expand Up @@ -169,7 +169,7 @@ func transformSequential(ctx *plancontext.PlanningContext, op *operators.Sequent
}

func transformInsertionSelection(ctx *plancontext.PlanningContext, op *operators.InsertSelection) (engine.Primitive, error) {
rb, isRoute := op.Insert.(*operators.Route)
rb, isRoute := op.Insert().(*operators.Route)
if !isRoute {
return nil, vterrors.VT13001(fmt.Sprintf("Incorrect type encountered: %T (transformInsertionSelection)", op.Insert))
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func transformInsertionSelection(ctx *plancontext.PlanningContext, op *operators

eins.Prefix, _, eins.Suffix = generateInsertShardedQuery(ins.AST)

selectionPlan, err := transformToPrimitive(ctx, op.Select)
selectionPlan, err := transformToPrimitive(ctx, op.Select())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1000,11 +1000,11 @@ func transformVindexPlan(ctx *plancontext.PlanningContext, op *operators.Vindex)
}

func transformRecurseCTE(ctx *plancontext.PlanningContext, op *operators.RecurseCTE) (engine.Primitive, error) {
seed, err := transformToPrimitive(ctx, op.Seed)
seed, err := transformToPrimitive(ctx, op.Seed())
if err != nil {
return nil, err
}
term, err := transformToPrimitive(ctx, op.Term)
term, err := transformToPrimitive(ctx, op.Term())
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,9 +717,9 @@ func buildRecursiveCTE(op *RecurseCTE, qb *queryBuilder) {
return jc.Original
})
pred := sqlparser.AndExpressions(predicates...)
buildQuery(op.Seed, qb)
buildQuery(op.Seed(), qb)
qbR := &queryBuilder{ctx: qb.ctx}
buildQuery(op.Term, qbR)
buildQuery(op.Term(), qbR)
qbR.addPredicate(pred)
infoFor, err := qb.ctx.SemTable.TableInfoFor(op.OuterID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ func createJoinPusher(rootAggr *Aggregator, operator Operator) *joinPusher {
return &joinPusher{
orig: rootAggr,
pushed: &Aggregator{
Source: operator,
QP: rootAggr.QP,
unaryOperator: newUnaryOp(operator),
QP: rootAggr.QP,
},
columns: initColReUse(len(rootAggr.Columns)),
tableID: TableID(operator),
Expand Down
10 changes: 1 addition & 9 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
// Both all aggregations and no grouping, and the inverse
// of all grouping and no aggregations are valid configurations of this operator
Aggregator struct {
Source Operator
unaryOperator
Columns []*sqlparser.AliasedExpr

WithRollup bool
Expand Down Expand Up @@ -75,14 +75,6 @@ func (a *Aggregator) Clone(inputs []Operator) Operator {
return &kopy
}

func (a *Aggregator) Inputs() []Operator {
return []Operator{a.Source}
}

func (a *Aggregator) SetInputs(operators []Operator) {
a.Source = operators[0]
}

func (a *Aggregator) AddPredicate(_ *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
return newFilter(a, expr)
}
Expand Down
15 changes: 2 additions & 13 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type (
// ApplyJoin is a nested loop join - for each row on the LHS,
// we'll execute the plan on the RHS, feeding data from left to right
ApplyJoin struct {
LHS, RHS Operator
binaryOperator

// JoinType is permitted to store only 3 of the possible values
// NormalJoinType, StraightJoinType and LeftJoinType.
Expand Down Expand Up @@ -85,8 +85,7 @@ type (

func NewApplyJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, predicate sqlparser.Expr, joinType sqlparser.JoinType) *ApplyJoin {
aj := &ApplyJoin{
LHS: lhs,
RHS: rhs,
binaryOperator: newBinaryOp(lhs, rhs),
Vars: map[string]int{},
JoinType: joinType,
JoinColumns: &applyJoinColumns{},
Expand All @@ -113,16 +112,6 @@ func (aj *ApplyJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlpars
return AddPredicate(ctx, aj, expr, false, newFilterSinglePredicate)
}

// Inputs implements the Operator interface
func (aj *ApplyJoin) Inputs() []Operator {
return []Operator{aj.LHS, aj.RHS}
}

// SetInputs implements the Operator interface
func (aj *ApplyJoin) SetInputs(inputs []Operator) {
aj.LHS, aj.RHS = inputs[0], inputs[1]
}

func (aj *ApplyJoin) GetLHS() Operator {
return aj.LHS
}
Expand Down
27 changes: 16 additions & 11 deletions go/vt/vtgate/planbuilder/operators/ast_to_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,10 @@ func translateQueryToOpWithMirroring(ctx *plancontext.PlanningContext, stmt sqlp
func createOperatorFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Select) Operator {
op := crossJoin(ctx, sel.From)

if sel.Where != nil {
op = addWherePredicates(ctx, sel.Where.Expr, op)
}
op = addWherePredicates(ctx, sel.GetWherePredicate(), op)

if sel.Comments != nil || sel.Lock != sqlparser.NoLock {
op = &LockAndComment{
Source: op,
Comments: sel.Comments,
Lock: sel.Lock,
}
op = newLockAndComment(op, sel.Comments, sel.Lock)
}

op = newHorizon(op, sel)
Expand All @@ -88,15 +82,26 @@ func addWherePredicates(ctx *plancontext.PlanningContext, expr sqlparser.Expr, o
return sqc.getRootOperator(op, nil)
}

func addWherePredsToSubQueryBuilder(ctx *plancontext.PlanningContext, expr sqlparser.Expr, op Operator, sqc *SubQueryBuilder) Operator {
func addWherePredsToSubQueryBuilder(ctx *plancontext.PlanningContext, in sqlparser.Expr, op Operator, sqc *SubQueryBuilder) Operator {
outerID := TableID(op)
exprs := sqlparser.SplitAndExpression(nil, expr)
for _, expr := range exprs {
for _, expr := range sqlparser.SplitAndExpression(nil, in) {
sqlparser.RemoveKeyspaceInCol(expr)
expr = simplifyPredicates(ctx, expr)
subq := sqc.handleSubquery(ctx, expr, outerID)
if subq != nil {
continue
}
boolean := ctx.IsConstantBool(expr)
if boolean != nil {
if *boolean {
// If the predicate is true, we can ignore it.
continue
}

// If the predicate is false, we push down a false predicate to influence routing
expr = sqlparser.NewIntLiteral("0")
}

op = op.AddPredicate(ctx, expr)
addColumnEquality(ctx, expr)
}
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vtgate/planbuilder/operators/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ import (

// LockAndComment contains any comments or locking directives we want on all queries down from this operator
type LockAndComment struct {
Source Operator
unaryOperator
Comments *sqlparser.ParsedComments
Lock sqlparser.Lock
}

func newLockAndComment(op Operator, comments *sqlparser.ParsedComments, lock sqlparser.Lock) Operator {
return &LockAndComment{
unaryOperator: newUnaryOp(op),
Comments: comments,
Lock: lock,
}
}

func (l *LockAndComment) Clone(inputs []Operator) Operator {
klon := *l
klon.Source = inputs[0]
return &klon
}

func (l *LockAndComment) Inputs() []Operator {
return []Operator{l.Source}
}

func (l *LockAndComment) SetInputs(operators []Operator) {
l.Source = operators[0]
}

func (l *LockAndComment) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
l.Source = l.Source.AddPredicate(ctx, expr)
return l
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vtgate/planbuilder/operators/cte_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func tryMergeRecurse(ctx *plancontext.PlanningContext, in *RecurseCTE) (Operator, *ApplyResult) {
op := tryMergeCTE(ctx, in.Seed, in.Term, in)
op := tryMergeCTE(ctx, in.Seed(), in.Term(), in)
if op == nil {
return in, NoRewrite
}
Expand Down Expand Up @@ -79,17 +79,17 @@ func mergeCTE(ctx *plancontext.PlanningContext, seed, term *Route, r Routing, in
hz := in.Horizon
hz.Source = term.Source
newTerm, _ := expandHorizon(ctx, hz)
cte := &RecurseCTE{
binaryOperator: newBinaryOp(seed.Source, newTerm),
Predicates: in.Predicates,
Def: in.Def,
LeftID: in.LeftID,
OuterID: in.OuterID,
Distinct: in.Distinct,
}
return &Route{
Routing: r,
Source: &RecurseCTE{
Predicates: in.Predicates,
Def: in.Def,
Seed: seed.Source,
Term: newTerm,
LeftID: in.LeftID,
OuterID: in.OuterID,
Distinct: in.Distinct,
},
MergedWith: []*Route{term},
Routing: r,
unaryOperator: newUnaryOp(cte),
MergedWith: []*Route{term},
}
}
17 changes: 4 additions & 13 deletions go/vt/vtgate/planbuilder/operators/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ func createOperatorFromDelete(ctx *plancontext.PlanningContext, deleteStmt *sqlp
op, vTbl = createDeleteOperator(ctx, deleteStmt)

if deleteStmt.Comments != nil {
op = &LockAndComment{
Source: op,
Comments: deleteStmt.Comments,
}
op = newLockAndComment(op, deleteStmt.Comments, sqlparser.NoLock)
}

var err error
Expand Down Expand Up @@ -151,10 +148,7 @@ func createDeleteWithInputOp(ctx *plancontext.PlanningContext, del *sqlparser.De
}

if del.Comments != nil {
op = &LockAndComment{
Source: op,
Comments: del.Comments,
}
op = newLockAndComment(op, del.Comments, sqlparser.NoLock)
}
return op
}
Expand Down Expand Up @@ -261,10 +255,7 @@ func createDeleteOperator(ctx *plancontext.PlanningContext, del *sqlparser.Delet
}

if del.Limit != nil {
delOp.Source = &Limit{
Source: addOrdering(ctx, op, del.OrderBy),
AST: del.Limit,
}
delOp.Source = newLimit(addOrdering(ctx, op, del.OrderBy), del.Limit, false)
} else {
delOp.Source = op
}
Expand Down Expand Up @@ -316,7 +307,7 @@ func addOrdering(ctx *plancontext.PlanningContext, op Operator, orderBy sqlparse
if len(order) == 0 {
return op
}
return &Ordering{Source: op, Order: order}
return newOrdering(op, order)
}

func updateQueryGraphWithSource(ctx *plancontext.PlanningContext, input Operator, tblID semantics.TableSet, vTbl *vindexes.Table) *vindexes.Table {
Expand Down
32 changes: 14 additions & 18 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

type (
Distinct struct {
Source Operator
QP *QueryProjection
unaryOperator
QP *QueryProjection

// When we go from AST to operator, we place DISTINCT ops in the required places in the op tree
// These are marked as `Required`, because they are semantically important to the results of the query.
Expand All @@ -45,6 +45,14 @@ type (
}
)

func newDistinct(src Operator, qp *QueryProjection, required bool) *Distinct {
return &Distinct{
unaryOperator: newUnaryOp(src),
QP: qp,
Required: required,
}
}

func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
columns := d.GetColumns(ctx)
for idx, col := range columns {
Expand All @@ -66,22 +74,10 @@ func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
}

func (d *Distinct) Clone(inputs []Operator) Operator {
return &Distinct{
Required: d.Required,
Source: inputs[0],
Columns: slices.Clone(d.Columns),
QP: d.QP,
PushedPerformance: d.PushedPerformance,
ResultColumns: d.ResultColumns,
}
}

func (d *Distinct) Inputs() []Operator {
return []Operator{d.Source}
}

func (d *Distinct) SetInputs(operators []Operator) {
d.Source = operators[0]
kopy := *d
kopy.Columns = slices.Clone(d.Columns)
kopy.Source = inputs[0]
return &kopy
}

func (d *Distinct) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
Expand Down
Loading

0 comments on commit d276007

Please sign in to comment.