Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow Insert with Duplicate key update and Replace Into queries on foreign key column, set locks on fk queries #13953

Merged
merged 10 commits into from
Sep 13, 2023
1 change: 1 addition & 0 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
GetOrderBy() OrderBy
GetLimit() *Limit
SetLimit(*Limit)
GetLock() Lock
SetLock(lock Lock)
SetInto(into *SelectInto)
SetWith(with *With)
Expand Down
35 changes: 27 additions & 8 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,11 @@ func (node *Select) GetLimit() *Limit {
return node.Limit
}

// GetLock returns the lock clause
func (node *Select) GetLock() Lock {
return node.Lock
}

// SetLock sets the lock clause
func (node *Select) SetLock(lock Lock) {
node.Lock = lock
Expand Down Expand Up @@ -1196,6 +1201,11 @@ func (node *Union) GetColumns() SelectExprs {
return node.Left.GetColumns()
}

// GetLock returns the lock clause
func (node *Union) GetLock() Lock {
return node.Lock
}

// SetLock sets the lock clause
func (node *Union) SetLock(lock Lock) {
node.Lock = lock
Expand Down Expand Up @@ -2533,13 +2543,22 @@ func (v *visitor) visitAllSelects(in SelectStatement, f func(p *Select, idx int)
panic("switch should be exhaustive")
}

func IsNonLiteral(updExprs UpdateExprs) bool {
for _, updateExpr := range updExprs {
switch updateExpr.Expr.(type) {
case *Argument, *NullVal, BoolVal, *Literal:
default:
return true
}
// IsRestrict returns true if the reference action is of restrict type.
func (ra ReferenceAction) IsRestrict() bool {
switch ra {
case Restrict, NoAction, DefaultAction:
return true
default:
return false
}
}

// IsLiteral returns true if the expression is of a literal type.
func IsLiteral(expr Expr) bool {
switch expr.(type) {
case *Argument, *NullVal, BoolVal, *Literal:
return true
default:
return false
}
return false
}
7 changes: 7 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ func (c *ParsedComments) Length() int {
return len(c.comments)
}

func (c *ParsedComments) GetComments() Comments {
if c != nil {
return c.comments
}
return nil
}

func (c *ParsedComments) Prepend(comment string) Comments {
if c == nil {
return Comments{comment}
Expand Down
55 changes: 49 additions & 6 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package planbuilder

import (
querypb "vitess.io/vitess/go/vt/proto/query"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
Expand All @@ -43,18 +45,24 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
// Check single unsharded. Even if the table is for single unsharded but sequence table is used.
// We cannot shortcut here as sequence column needs additional planning.
ks, tables := ctx.SemTable.SingleUnshardedKeyspace()
if ks != nil && tables[0].AutoIncrement == nil {
plan := insertUnshardedShortcut(insStmt, ks, tables)
plan = pushCommentDirectivesOnPlan(plan, insStmt)
return newPlanResult(plan.Primitive(), operators.QualifiedTables(ks, tables)...), nil
FkPlanNeeded := false
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
if ks != nil {
noAutoInc := tables[0].AutoIncrement == nil
FkPlanNeeded = fkManagementRequiredForInsert(ctx, tables[0], sqlparser.UpdateExprs(insStmt.OnDup), insStmt.Action == sqlparser.ReplaceAct)
if noAutoInc && !FkPlanNeeded {
plan := insertUnshardedShortcut(insStmt, ks, tables)
plan = pushCommentDirectivesOnPlan(plan, insStmt)
return newPlanResult(plan.Primitive(), operators.QualifiedTables(ks, tables)...), nil
}
}

tblInfo, err := ctx.SemTable.TableInfoFor(ctx.SemTable.TableSetFor(insStmt.Table))
if err != nil {
return nil, err
}
if tblInfo.GetVindexTable().Keyspace.Sharded && ctx.SemTable.NotUnshardedErr != nil {
return nil, ctx.SemTable.NotUnshardedErr

if err = errOutIfPlanCannotBeConstructed(ctx, tblInfo.GetVindexTable(), insStmt, FkPlanNeeded); err != nil {
return nil, err
}

err = queryRewrite(ctx.SemTable, reservedVars, insStmt)
Expand Down Expand Up @@ -83,6 +91,41 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
return newPlanResult(plan.Primitive(), operators.TablesUsed(op)...), nil
}

func errOutIfPlanCannotBeConstructed(ctx *plancontext.PlanningContext, vTbl *vindexes.Table, insStmt *sqlparser.Insert, fkPlanNeeded bool) error {
if vTbl.Keyspace.Sharded && ctx.SemTable.NotUnshardedErr != nil {
return ctx.SemTable.NotUnshardedErr
}
if insStmt.Action != sqlparser.ReplaceAct {
return nil
}
if fkPlanNeeded {
return vterrors.VT12001("REPLACE INTO with foreign keys")
}
return nil
}

// TODO: Handle all this in semantic analysis.
func fkManagementRequiredForInsert(ctx *plancontext.PlanningContext, vTbl *vindexes.Table, updateExprs sqlparser.UpdateExprs, replace bool) bool {
ksMode, err := ctx.VSchema.ForeignKeyMode(vTbl.Keyspace.Name)
if err != nil || ksMode != vschemapb.Keyspace_FK_MANAGED {
return false
}

if len(vTbl.ParentFKsNeedsHandling(ctx.VerifyAllFKs, "")) > 0 {
return true
}

childFks := vTbl.ChildFKsNeedsHandling(ctx.VerifyAllFKs, vindexes.UpdateAction)
if len(childFks) > 0 && replace {
return true
}

// Check if any column in the parent table is being updated which has a child foreign key.
return columnModified(updateExprs, func(expr *sqlparser.UpdateExpr) ([]vindexes.ParentFKInfo, []vindexes.ChildFKInfo) {
return nil, childFks
})
}

func insertUnshardedShortcut(stmt *sqlparser.Insert, ks *vindexes.Keyspace, tables []*vindexes.Table) logicalPlan {
eIns := &engine.Insert{}
eIns.Keyspace = ks
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,12 @@ func transformRoutePlan(ctx *plancontext.PlanningContext, op *operators.Route) (

switch stmt := stmt.(type) {
case sqlparser.SelectStatement:
if op.Lock != sqlparser.NoLock {
stmt.SetLock(op.Lock)
}
if op.Comments != nil {
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
stmt.SetComments(op.Comments.GetComments())
}
return buildRouteLogicalPlan(ctx, op, stmt)
case *sqlparser.Update:
return buildUpdateLogicalPlan(ctx, op, dmlOp, stmt)
Expand Down
Loading