Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Operator Input Handling #16689

Merged
merged 14 commits into from
Sep 3, 2024
12 changes: 6 additions & 6 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func transformToPrimitive(ctx *plancontext.PlanningContext, op operators.Operato
}

func transformPercentBasedMirror(ctx *plancontext.PlanningContext, op *operators.PercentBasedMirror) (engine.Primitive, error) {
primitive, err := transformToPrimitive(ctx, op.Operator)
primitive, err := transformToPrimitive(ctx, op.Operator())
if err != nil {
return nil, err
}

target, err := transformToPrimitive(ctx.UseMirror(), op.Target)
target, err := transformToPrimitive(ctx.UseMirror(), op.Target())
// Mirroring is best-effort. If we encounter an error while building the
// mirror target primitive, proceed without mirroring.
if err != nil {
Expand Down Expand Up @@ -169,7 +169,7 @@ func transformSequential(ctx *plancontext.PlanningContext, op *operators.Sequent
}

func transformInsertionSelection(ctx *plancontext.PlanningContext, op *operators.InsertSelection) (engine.Primitive, error) {
rb, isRoute := op.Insert.(*operators.Route)
rb, isRoute := op.Insert().(*operators.Route)
if !isRoute {
return nil, vterrors.VT13001(fmt.Sprintf("Incorrect type encountered: %T (transformInsertionSelection)", op.Insert))
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func transformInsertionSelection(ctx *plancontext.PlanningContext, op *operators

eins.Prefix, _, eins.Suffix = generateInsertShardedQuery(ins.AST)

selectionPlan, err := transformToPrimitive(ctx, op.Select)
selectionPlan, err := transformToPrimitive(ctx, op.Select())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1000,11 +1000,11 @@ func transformVindexPlan(ctx *plancontext.PlanningContext, op *operators.Vindex)
}

func transformRecurseCTE(ctx *plancontext.PlanningContext, op *operators.RecurseCTE) (engine.Primitive, error) {
seed, err := transformToPrimitive(ctx, op.Seed)
seed, err := transformToPrimitive(ctx, op.Seed())
if err != nil {
return nil, err
}
term, err := transformToPrimitive(ctx, op.Term)
term, err := transformToPrimitive(ctx, op.Term())
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,9 +717,9 @@ func buildRecursiveCTE(op *RecurseCTE, qb *queryBuilder) {
return jc.Original
})
pred := sqlparser.AndExpressions(predicates...)
buildQuery(op.Seed, qb)
buildQuery(op.Seed(), qb)
qbR := &queryBuilder{ctx: qb.ctx}
buildQuery(op.Term, qbR)
buildQuery(op.Term(), qbR)
qbR.addPredicate(pred)
infoFor, err := qb.ctx.SemTable.TableInfoFor(op.OuterID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ func createJoinPusher(rootAggr *Aggregator, operator Operator) *joinPusher {
return &joinPusher{
orig: rootAggr,
pushed: &Aggregator{
Source: operator,
QP: rootAggr.QP,
unaryOperator: newUnaryOp(operator),
QP: rootAggr.QP,
},
columns: initColReUse(len(rootAggr.Columns)),
tableID: TableID(operator),
Expand Down
10 changes: 1 addition & 9 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
// Both all aggregations and no grouping, and the inverse
// of all grouping and no aggregations are valid configurations of this operator
Aggregator struct {
Source Operator
unaryOperator
Columns []*sqlparser.AliasedExpr

WithRollup bool
Expand Down Expand Up @@ -75,14 +75,6 @@ func (a *Aggregator) Clone(inputs []Operator) Operator {
return &kopy
}

func (a *Aggregator) Inputs() []Operator {
return []Operator{a.Source}
}

func (a *Aggregator) SetInputs(operators []Operator) {
a.Source = operators[0]
}

func (a *Aggregator) AddPredicate(_ *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
return newFilter(a, expr)
}
Expand Down
15 changes: 2 additions & 13 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type (
// ApplyJoin is a nested loop join - for each row on the LHS,
// we'll execute the plan on the RHS, feeding data from left to right
ApplyJoin struct {
LHS, RHS Operator
binaryOperator

// JoinType is permitted to store only 3 of the possible values
// NormalJoinType, StraightJoinType and LeftJoinType.
Expand Down Expand Up @@ -85,8 +85,7 @@ type (

func NewApplyJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, predicate sqlparser.Expr, joinType sqlparser.JoinType) *ApplyJoin {
aj := &ApplyJoin{
LHS: lhs,
RHS: rhs,
binaryOperator: newBinaryOp(lhs, rhs),
Vars: map[string]int{},
JoinType: joinType,
JoinColumns: &applyJoinColumns{},
Expand All @@ -113,16 +112,6 @@ func (aj *ApplyJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlpars
return AddPredicate(ctx, aj, expr, false, newFilterSinglePredicate)
}

// Inputs implements the Operator interface
func (aj *ApplyJoin) Inputs() []Operator {
return []Operator{aj.LHS, aj.RHS}
}

// SetInputs implements the Operator interface
func (aj *ApplyJoin) SetInputs(inputs []Operator) {
aj.LHS, aj.RHS = inputs[0], inputs[1]
}

func (aj *ApplyJoin) GetLHS() Operator {
return aj.LHS
}
Expand Down
27 changes: 16 additions & 11 deletions go/vt/vtgate/planbuilder/operators/ast_to_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,10 @@ func translateQueryToOpWithMirroring(ctx *plancontext.PlanningContext, stmt sqlp
func createOperatorFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Select) Operator {
op := crossJoin(ctx, sel.From)

if sel.Where != nil {
op = addWherePredicates(ctx, sel.Where.Expr, op)
}
op = addWherePredicates(ctx, sel.GetWherePredicate(), op)

if sel.Comments != nil || sel.Lock != sqlparser.NoLock {
op = &LockAndComment{
Source: op,
Comments: sel.Comments,
Lock: sel.Lock,
}
op = newLockAndComment(op, sel.Comments, sel.Lock)
}

op = newHorizon(op, sel)
Expand All @@ -88,15 +82,26 @@ func addWherePredicates(ctx *plancontext.PlanningContext, expr sqlparser.Expr, o
return sqc.getRootOperator(op, nil)
}

func addWherePredsToSubQueryBuilder(ctx *plancontext.PlanningContext, expr sqlparser.Expr, op Operator, sqc *SubQueryBuilder) Operator {
func addWherePredsToSubQueryBuilder(ctx *plancontext.PlanningContext, in sqlparser.Expr, op Operator, sqc *SubQueryBuilder) Operator {
outerID := TableID(op)
exprs := sqlparser.SplitAndExpression(nil, expr)
for _, expr := range exprs {
for _, expr := range sqlparser.SplitAndExpression(nil, in) {
sqlparser.RemoveKeyspaceInCol(expr)
expr = simplifyPredicates(ctx, expr)
subq := sqc.handleSubquery(ctx, expr, outerID)
if subq != nil {
continue
}
boolean := ctx.IsConstantBool(expr)
if boolean != nil {
if *boolean {
// If the predicate is true, we can ignore it.
continue
}

// If the predicate is false, we push down a false predicate to influence routing
expr = sqlparser.NewIntLiteral("0")
}

op = op.AddPredicate(ctx, expr)
addColumnEquality(ctx, expr)
}
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vtgate/planbuilder/operators/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ import (

// LockAndComment contains any comments or locking directives we want on all queries down from this operator
type LockAndComment struct {
Source Operator
unaryOperator
Comments *sqlparser.ParsedComments
Lock sqlparser.Lock
}

func newLockAndComment(op Operator, comments *sqlparser.ParsedComments, lock sqlparser.Lock) Operator {
return &LockAndComment{
unaryOperator: newUnaryOp(op),
Comments: comments,
Lock: lock,
}
}

func (l *LockAndComment) Clone(inputs []Operator) Operator {
klon := *l
klon.Source = inputs[0]
return &klon
}

func (l *LockAndComment) Inputs() []Operator {
return []Operator{l.Source}
}

func (l *LockAndComment) SetInputs(operators []Operator) {
l.Source = operators[0]
}

func (l *LockAndComment) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
l.Source = l.Source.AddPredicate(ctx, expr)
return l
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vtgate/planbuilder/operators/cte_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func tryMergeRecurse(ctx *plancontext.PlanningContext, in *RecurseCTE) (Operator, *ApplyResult) {
op := tryMergeCTE(ctx, in.Seed, in.Term, in)
op := tryMergeCTE(ctx, in.Seed(), in.Term(), in)
if op == nil {
return in, NoRewrite
}
Expand Down Expand Up @@ -79,17 +79,17 @@ func mergeCTE(ctx *plancontext.PlanningContext, seed, term *Route, r Routing, in
hz := in.Horizon
hz.Source = term.Source
newTerm, _ := expandHorizon(ctx, hz)
cte := &RecurseCTE{
binaryOperator: newBinaryOp(seed.Source, newTerm),
Predicates: in.Predicates,
Def: in.Def,
LeftID: in.LeftID,
OuterID: in.OuterID,
Distinct: in.Distinct,
}
return &Route{
Routing: r,
Source: &RecurseCTE{
Predicates: in.Predicates,
Def: in.Def,
Seed: seed.Source,
Term: newTerm,
LeftID: in.LeftID,
OuterID: in.OuterID,
Distinct: in.Distinct,
},
MergedWith: []*Route{term},
Routing: r,
unaryOperator: newUnaryOp(cte),
MergedWith: []*Route{term},
}
}
17 changes: 4 additions & 13 deletions go/vt/vtgate/planbuilder/operators/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ func createOperatorFromDelete(ctx *plancontext.PlanningContext, deleteStmt *sqlp
op, vTbl = createDeleteOperator(ctx, deleteStmt)

if deleteStmt.Comments != nil {
op = &LockAndComment{
Source: op,
Comments: deleteStmt.Comments,
}
op = newLockAndComment(op, deleteStmt.Comments, sqlparser.NoLock)
}

var err error
Expand Down Expand Up @@ -151,10 +148,7 @@ func createDeleteWithInputOp(ctx *plancontext.PlanningContext, del *sqlparser.De
}

if del.Comments != nil {
op = &LockAndComment{
Source: op,
Comments: del.Comments,
}
op = newLockAndComment(op, del.Comments, sqlparser.NoLock)
}
return op
}
Expand Down Expand Up @@ -261,10 +255,7 @@ func createDeleteOperator(ctx *plancontext.PlanningContext, del *sqlparser.Delet
}

if del.Limit != nil {
delOp.Source = &Limit{
Source: addOrdering(ctx, op, del.OrderBy),
AST: del.Limit,
}
delOp.Source = newLimit(addOrdering(ctx, op, del.OrderBy), del.Limit, false)
} else {
delOp.Source = op
}
Expand Down Expand Up @@ -316,7 +307,7 @@ func addOrdering(ctx *plancontext.PlanningContext, op Operator, orderBy sqlparse
if len(order) == 0 {
return op
}
return &Ordering{Source: op, Order: order}
return newOrdering(op, order)
}

func updateQueryGraphWithSource(ctx *plancontext.PlanningContext, input Operator, tblID semantics.TableSet, vTbl *vindexes.Table) *vindexes.Table {
Expand Down
32 changes: 14 additions & 18 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

type (
Distinct struct {
Source Operator
QP *QueryProjection
unaryOperator
QP *QueryProjection

// When we go from AST to operator, we place DISTINCT ops in the required places in the op tree
// These are marked as `Required`, because they are semantically important to the results of the query.
Expand All @@ -45,6 +45,14 @@ type (
}
)

func newDistinct(src Operator, qp *QueryProjection, required bool) *Distinct {
return &Distinct{
unaryOperator: newUnaryOp(src),
QP: qp,
Required: required,
}
}

func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
columns := d.GetColumns(ctx)
for idx, col := range columns {
Expand All @@ -66,22 +74,10 @@ func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
}

func (d *Distinct) Clone(inputs []Operator) Operator {
return &Distinct{
Required: d.Required,
Source: inputs[0],
Columns: slices.Clone(d.Columns),
QP: d.QP,
PushedPerformance: d.PushedPerformance,
ResultColumns: d.ResultColumns,
}
}

func (d *Distinct) Inputs() []Operator {
return []Operator{d.Source}
}

func (d *Distinct) SetInputs(operators []Operator) {
d.Source = operators[0]
kopy := *d
kopy.Columns = slices.Clone(d.Columns)
kopy.Source = inputs[0]
return &kopy
}

func (d *Distinct) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
Expand Down
Loading
Loading