Skip to content

Commit

Permalink
refactor: panic instead of error for merging logic
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Oct 10, 2023
1 parent 5476331 commit ac9d992
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 61 deletions.
7 changes: 3 additions & 4 deletions go/vt/vtgate/planbuilder/operators/info_schema_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func isTableOrSchemaRoutable(cmp *sqlparser.ComparisonExpr) (
return false, nil
}

func tryMergeInfoSchemaRoutings(ctx *plancontext.PlanningContext, routingA, routingB Routing, m merger, lhsRoute, rhsRoute *Route) (*Route, error) {
func tryMergeInfoSchemaRoutings(ctx *plancontext.PlanningContext, routingA, routingB Routing, m merger, lhsRoute, rhsRoute *Route) *Route {
// we have already checked type earlier, so this should always be safe
isrA := routingA.(*InfoSchemaRouting)
isrB := routingB.(*InfoSchemaRouting)
Expand All @@ -188,7 +188,7 @@ func tryMergeInfoSchemaRoutings(ctx *plancontext.PlanningContext, routingA, rout
for k, expr := range isrB.SysTableTableName {
if e, found := isrA.SysTableTableName[k]; found && !sqlparser.Equals.Expr(expr, e) {
// schema names are the same, but we have contradicting table names, so we give up
return nil, nil
return nil
}
isrA.SysTableTableName[k] = expr
}
Expand All @@ -203,9 +203,8 @@ func tryMergeInfoSchemaRoutings(ctx *plancontext.PlanningContext, routingA, rout

// give up
default:
return nil, nil
return nil
}

}

var (
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
// mergeJoinInputs checks whether two operators can be merged into a single one.
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m merger) (*Route, error) {
func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m merger) *Route {
lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs)
if lhsRoute == nil {
return nil, nil
return nil
}

switch {
Expand Down Expand Up @@ -62,7 +62,7 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo
return tryMergeJoinShardedRouting(ctx, lhsRoute, rhsRoute, m, joinPredicates)

default:
return nil, nil
return nil
}
}

Expand All @@ -87,8 +87,8 @@ func prepareInputRoutes(lhs ops.Operator, rhs ops.Operator) (*Route, *Route, Rou

type (
merger interface {
mergeShardedRouting(ctx *plancontext.PlanningContext, r1, r2 *ShardedRouting, op1, op2 *Route) (*Route, error)
merge(ctx *plancontext.PlanningContext, op1, op2 *Route, r Routing) (*Route, error)
mergeShardedRouting(ctx *plancontext.PlanningContext, r1, r2 *ShardedRouting, op1, op2 *Route) *Route
merge(ctx *plancontext.PlanningContext, op1, op2 *Route, r Routing) *Route
}

joinMerger struct {
Expand Down Expand Up @@ -184,7 +184,7 @@ func newJoinMerge(predicates []sqlparser.Expr, innerJoin bool) merger {
}
}

func (jm *joinMerger) mergeShardedRouting(ctx *plancontext.PlanningContext, r1, r2 *ShardedRouting, op1, op2 *Route) (*Route, error) {
func (jm *joinMerger) mergeShardedRouting(ctx *plancontext.PlanningContext, r1, r2 *ShardedRouting, op1, op2 *Route) *Route {
return jm.merge(ctx, op1, op2, mergeShardedRouting(r1, r2))
}

Expand All @@ -207,10 +207,10 @@ func (jm *joinMerger) getApplyJoin(ctx *plancontext.PlanningContext, op1, op2 *R
return NewApplyJoin(op1.Source, op2.Source, ctx.SemTable.AndExpressions(jm.predicates...), !jm.innerJoin)
}

func (jm *joinMerger) merge(ctx *plancontext.PlanningContext, op1, op2 *Route, r Routing) (*Route, error) {
func (jm *joinMerger) merge(ctx *plancontext.PlanningContext, op1, op2 *Route, r Routing) *Route {
return &Route{
Source: jm.getApplyJoin(ctx, op1, op2),
MergedWith: []*Route{op2},
Routing: r,
}, nil
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p Phase) act(ctx *plancontext.PlanningContext, op ops.Operator) (ops.Opera
case cleanOutPerfDistinct:
return removePerformanceDistinctAboveRoute(ctx, op)
case subquerySettling:
return settleSubqueries(ctx, op)
return settleSubqueries(ctx, op), nil
}

return op, nil
Expand Down
5 changes: 1 addition & 4 deletions go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,7 @@ func requiresSwitchingSides(ctx *plancontext.PlanningContext, op ops.Operator) b
}

func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, inner bool) (ops.Operator, *rewrite.ApplyResult, error) {
newPlan, err := mergeJoinInputs(ctx, lhs, rhs, joinPredicates, newJoinMerge(joinPredicates, inner))
if err != nil {
return nil, nil, err
}
newPlan := mergeJoinInputs(ctx, lhs, rhs, joinPredicates, newJoinMerge(joinPredicates, inner))
if newPlan != nil {
return newPlan, rewrite.NewTree("merge routes into single operator", newPlan), nil
}
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/planbuilder/operators/sharded_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func tryMergeJoinShardedRouting(
routeA, routeB *Route,
m merger,
joinPredicates []sqlparser.Expr,
) (*Route, error) {
) *Route {
sameKeyspace := routeA.Routing.Keyspace() == routeB.Routing.Keyspace()
tblA := routeA.Routing.(*ShardedRouting)
tblB := routeB.Routing.(*ShardedRouting)
Expand All @@ -605,20 +605,20 @@ func tryMergeJoinShardedRouting(
// If we are doing two Scatters, we have to make sure that the
// joins are on the correct vindex to allow them to be merged
// no join predicates - no vindex
return nil, nil
return nil
}

if !sameKeyspace {
return nil, vterrors.VT12001("cross-shard correlated subquery")
panic(vterrors.VT12001("cross-shard correlated subquery"))
}

canMerge := canMergeOnFilters(ctx, routeA, routeB, joinPredicates)
if !canMerge {
return nil, nil
return nil
}
return m.mergeShardedRouting(ctx, tblA, tblB, routeA, routeB)
}
return nil, nil
return nil
}

// makeEvalEngineExpr transforms the given sqlparser.Expr into an evalengine expression
Expand Down
68 changes: 29 additions & 39 deletions go/vt/vtgate/planbuilder/operators/subquery_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func isMergeable(ctx *plancontext.PlanningContext, query sqlparser.SelectStateme
return true
}

func settleSubqueries(ctx *plancontext.PlanningContext, op ops.Operator) (ops.Operator, error) {
func settleSubqueries(ctx *plancontext.PlanningContext, op ops.Operator) ops.Operator {
visit := func(op ops.Operator, lhsTables semantics.TableSet, isRoot bool) (ops.Operator, *rewrite.ApplyResult, error) {
switch op := op.(type) {
case *SubQueryContainer:
Expand Down Expand Up @@ -101,7 +101,11 @@ func settleSubqueries(ctx *plancontext.PlanningContext, op ops.Operator) (ops.Op
}
return op, rewrite.SameTree, nil
}
return rewrite.BottomUp(op, TableID, visit, nil)
op, err := rewrite.BottomUp(op, TableID, visit, nil)
if err != nil {
panic(err)
}
return op
}

func mergeSubqueryExpr(ctx *plancontext.PlanningContext, pe *ProjExpr) {
Expand Down Expand Up @@ -296,18 +300,15 @@ func tryMergeWithRHS(ctx *plancontext.PlanningContext, inner *SubQuery, outer *A
return nil, nil, nil
}

newExpr, err := rewriteOriginalPushedToRHS(ctx, inner.Original, outer)
if err != nil {
return nil, nil, err
}
newExpr := rewriteOriginalPushedToRHS(ctx, inner.Original, outer)
sqm := &subqueryRouteMerger{
outer: outerRoute,
original: newExpr,
subq: inner,
}
newOp, err := mergeSubqueryInputs(ctx, innerRoute, outerRoute, inner.GetMergePredicates(), sqm)
if err != nil || newOp == nil {
return nil, nil, err
newOp := mergeSubqueryInputs(ctx, innerRoute, outerRoute, inner.GetMergePredicates(), sqm)
if newOp == nil {
return nil, nil, nil
}

outer.RHS = newOp
Expand All @@ -334,7 +335,7 @@ func addSubQuery(in ops.Operator, inner *SubQuery) ops.Operator {
// rewriteOriginalPushedToRHS rewrites the original expression to use the argument names instead of the column names
// this is necessary because we are pushing the subquery into the RHS of the join, and we need to use the argument names
// instead of the column names
func rewriteOriginalPushedToRHS(ctx *plancontext.PlanningContext, expression sqlparser.Expr, outer *ApplyJoin) (sqlparser.Expr, error) {
func rewriteOriginalPushedToRHS(ctx *plancontext.PlanningContext, expression sqlparser.Expr, outer *ApplyJoin) sqlparser.Expr {
var err error
outerID := TableID(outer.LHS)
result := sqlparser.CopyOnRewrite(expression, nil, func(cursor *sqlparser.CopyOnWriteCursor) {
Expand All @@ -355,9 +356,9 @@ func rewriteOriginalPushedToRHS(ctx *plancontext.PlanningContext, expression sql
cursor.Replace(sqlparser.NewArgument(name))
}, nil)
if err != nil {
return nil, err
panic(err)
}
return result.(sqlparser.Expr), nil
return result.(sqlparser.Expr)
}

func pushProjectionToOuterContainer(ctx *plancontext.PlanningContext, p *Projection, src *SubQueryContainer) (ops.Operator, *rewrite.ApplyResult, error) {
Expand Down Expand Up @@ -486,10 +487,7 @@ func tryMergeSubqueriesRecursively(
original: subQuery.Original,
subq: subQuery,
}
op, err := mergeSubqueryInputs(ctx, inner.Outer, outer, exprs, merger)
if err != nil {
return nil, nil, err
}
op := mergeSubqueryInputs(ctx, inner.Outer, outer, exprs, merger)
if op == nil {
return outer, rewrite.SameTree, nil
}
Expand Down Expand Up @@ -524,10 +522,7 @@ func tryMergeSubqueryWithOuter(ctx *plancontext.PlanningContext, subQuery *SubQu
original: subQuery.Original,
subq: subQuery,
}
op, err := mergeSubqueryInputs(ctx, inner, outer, exprs, merger)
if err != nil {
return nil, nil, err
}
op := mergeSubqueryInputs(ctx, inner, outer, exprs, merger)
if op == nil {
return outer, rewrite.SameTree, nil
}
Expand Down Expand Up @@ -572,7 +567,7 @@ type subqueryRouteMerger struct {
subq *SubQuery
}

func (s *subqueryRouteMerger) mergeShardedRouting(ctx *plancontext.PlanningContext, r1, r2 *ShardedRouting, old1, old2 *Route) (*Route, error) {
func (s *subqueryRouteMerger) mergeShardedRouting(ctx *plancontext.PlanningContext, r1, r2 *ShardedRouting, old1, old2 *Route) *Route {
tr := &ShardedRouting{
VindexPreds: append(r1.VindexPreds, r2.VindexPreds...),
keyspace: r1.keyspace,
Expand Down Expand Up @@ -622,12 +617,12 @@ func (s *subqueryRouteMerger) mergeShardedRouting(ctx *plancontext.PlanningConte

routing, err := tr.resetRoutingLogic(ctx)
if err != nil {
return nil, err
panic(err)
}
return s.merge(ctx, old1, old2, routing)
}

func (s *subqueryRouteMerger) merge(ctx *plancontext.PlanningContext, inner, outer *Route, r Routing) (*Route, error) {
func (s *subqueryRouteMerger) merge(ctx *plancontext.PlanningContext, inner, outer *Route, r Routing) *Route {
if !s.subq.TopLevel {
// if the subquery we are merging isn't a top level predicate, we can't use it for routing
return &Route{
Expand All @@ -636,12 +631,10 @@ func (s *subqueryRouteMerger) merge(ctx *plancontext.PlanningContext, inner, out
Routing: outer.Routing,
Ordering: outer.Ordering,
ResultColumns: outer.ResultColumns,
}, nil

}
}
_, isSharded := r.(*ShardedRouting)
var src ops.Operator
var err error
if isSharded {
src = s.outer.Source
if !s.subq.IsProjection {
Expand All @@ -651,18 +644,15 @@ func (s *subqueryRouteMerger) merge(ctx *plancontext.PlanningContext, inner, out
}
}
} else {
src, err = s.rewriteASTExpression(ctx, inner)
if err != nil {
return nil, err
}
src = s.rewriteASTExpression(ctx, inner)
}
return &Route{
Source: src,
MergedWith: mergedWith(inner, outer),
Routing: r,
Ordering: s.outer.Ordering,
ResultColumns: s.outer.ResultColumns,
}, nil
}
}

// rewriteASTExpression rewrites the subquery expression that is used in the merged output
Expand All @@ -672,15 +662,15 @@ func (s *subqueryRouteMerger) merge(ctx *plancontext.PlanningContext, inner, out
// we should be able to use this method for all plan types,
// but using this method for sharded queries introduces bugs
// We really need to figure out why this is not working as expected
func (s *subqueryRouteMerger) rewriteASTExpression(ctx *plancontext.PlanningContext, inner *Route) (ops.Operator, error) {
func (s *subqueryRouteMerger) rewriteASTExpression(ctx *plancontext.PlanningContext, inner *Route) ops.Operator {
src := s.outer.Source
stmt, _, err := ToSQL(ctx, inner.Source)
if err != nil {
return nil, err
panic(err)
}
subqStmt, ok := stmt.(sqlparser.SelectStatement)
if !ok {
return nil, vterrors.VT13001("subqueries should only be select statement")
panic(vterrors.VT13001("subqueries should only be select statement"))
}
subqID := TableID(s.subq.Subquery)
subqStmt = sqlparser.CopyOnRewrite(subqStmt, nil, func(cursor *sqlparser.CopyOnWriteCursor) {
Expand Down Expand Up @@ -708,7 +698,7 @@ func (s *subqueryRouteMerger) rewriteASTExpression(ctx *plancontext.PlanningCont
}
}, nil).(sqlparser.SelectStatement)
if err != nil {
return nil, err
panic(err)
}

if s.subq.IsProjection {
Expand All @@ -726,17 +716,17 @@ func (s *subqueryRouteMerger) rewriteASTExpression(ctx *plancontext.PlanningCont
Predicates: []sqlparser.Expr{sQuery},
}
}
return src, nil
return src
}

// mergeSubqueryInputs checks whether two operators can be merged into a single one.
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
// These rules are similar but different from join merging
func mergeSubqueryInputs(ctx *plancontext.PlanningContext, in, out ops.Operator, joinPredicates []sqlparser.Expr, m *subqueryRouteMerger) (*Route, error) {
func mergeSubqueryInputs(ctx *plancontext.PlanningContext, in, out ops.Operator, joinPredicates []sqlparser.Expr, m *subqueryRouteMerger) *Route {
inRoute, outRoute := operatorsToRoutes(in, out)
if inRoute == nil || outRoute == nil {
return nil, nil
return nil
}

inRoute, outRoute, inRouting, outRouting, sameKeyspace := getRoutesOrAlternates(inRoute, outRoute)
Expand Down Expand Up @@ -770,7 +760,7 @@ func mergeSubqueryInputs(ctx *plancontext.PlanningContext, in, out ops.Operator,
return tryMergeJoinShardedRouting(ctx, inRoute, outRoute, m, joinPredicates)

default:
return nil, nil
return nil
}
}

Expand Down

0 comments on commit ac9d992

Please sign in to comment.