Skip to content

Commit

Permalink
feat: plan insert on duplicate key update with multiple rows
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Dec 5, 2023
1 parent 2cec09e commit 213e423
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 82 deletions.
30 changes: 18 additions & 12 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,28 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera
}

func transformUpsert(ctx *plancontext.PlanningContext, op *operators.Upsert) (logicalPlan, error) {
ip, err := transformToLogicalPlan(ctx, op.InsertOp)
u := &upsert{}
for _, source := range op.Sources {
iLp, uLp, err := transformOneUpsert(ctx, source)
if err != nil {
return nil, err
}
u.insert = append(u.insert, iLp)
u.update = append(u.update, uLp)
}
return u, nil
}

func transformOneUpsert(ctx *plancontext.PlanningContext, source operators.UpsertSource) (iLp, uLp logicalPlan, err error) {
iLp, err = transformToLogicalPlan(ctx, source.Insert)
if err != nil {
return nil, err
return
}
if ins, ok := ip.(*insert); ok {
if ins, ok := iLp.(*insert); ok {
ins.eInsert.PreventAutoCommit = true
}
up, err := transformToLogicalPlan(ctx, op.UpdateOp)
if err != nil {
return nil, err
}

return &upsert{
insert: ip,
update: up,
}, nil
uLp, err = transformToLogicalPlan(ctx, source.Update)
return
}

func transformSequential(ctx *plancontext.PlanningContext, op *operators.Sequential) (logicalPlan, error) {
Expand Down
17 changes: 6 additions & 11 deletions go/vt/vtgate/planbuilder/operators/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,18 @@ func checkAndCreateInsertOperator(ctx *plancontext.PlanningContext, ins *sqlpars
panic(vterrors.VT12001("REPLACE INTO with foreign keys"))
}
if len(ins.OnDup) > 0 {
row := getSingleRowOrError(ins)
return createUpsertOperator(ctx, ins, insOp, row, vTbl)
rows := getRowsOrError(ins)
return createUpsertOperator(ctx, ins, insOp, rows, vTbl)
}
}
return insOp
}

func getSingleRowOrError(ins *sqlparser.Insert) sqlparser.ValTuple {
switch rows := ins.Rows.(type) {
case sqlparser.SelectStatement:
panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with select statement"))
case sqlparser.Values:
if len(rows) == 1 {
return rows[0]
}
func getRowsOrError(ins *sqlparser.Insert) sqlparser.Values {
if rows, ok := ins.Rows.(sqlparser.Values); ok {
return rows
}
panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with multiple rows"))
panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with select statement"))
}

func getWhereCondExpr(compExprs []*sqlparser.ComparisonExpr) sqlparser.Expr {
Expand Down
133 changes: 77 additions & 56 deletions go/vt/vtgate/planbuilder/operators/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,43 @@ var _ Operator = (*Upsert)(nil)

// Upsert represents an insert on duplicate key operation on a table.
type Upsert struct {
InsertOp Operator
UpdateOp Operator
Sources []UpsertSource

noColumns
noPredicates
}

type UpsertSource struct {
Insert Operator
Update Operator
}

func (u *Upsert) Clone(inputs []Operator) Operator {
return &Upsert{
InsertOp: inputs[0],
UpdateOp: inputs[1],
up := &Upsert{}
up.setInputs(inputs)
return up
}

func (u *Upsert) setInputs(inputs []Operator) {
for i := 0; i < len(inputs); i += 2 {
u.Sources = append(u.Sources, UpsertSource{
Insert: inputs[i],
Update: inputs[i+1],
})
}
}

func (u *Upsert) Inputs() []Operator {
return []Operator{u.InsertOp, u.UpdateOp}
var inputs []Operator
for _, source := range u.Sources {
inputs = append(inputs, source.Insert, source.Update)
}
return inputs
}

func (u *Upsert) SetInputs(operators []Operator) {
u.InsertOp = operators[0]
u.UpdateOp = operators[1]
func (u *Upsert) SetInputs(inputs []Operator) {
u.Sources = nil
u.setInputs(inputs)
}

func (u *Upsert) ShortDescription() string {
Expand All @@ -58,7 +74,7 @@ func (u *Upsert) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
return nil
}

func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, insOp Operator, row sqlparser.ValTuple, vTbl *vindexes.Table) Operator {
func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, insOp Operator, rows sqlparser.Values, vTbl *vindexes.Table) Operator {
if len(vTbl.UniqueKeys) != 0 {
panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with unique keys"))
}
Expand All @@ -70,56 +86,61 @@ func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Inser
return insOp
}

var whereExpr sqlparser.Expr
for _, pIdx := range pIndexes {
var expr sqlparser.Expr
if pIdx.idx == -1 {
expr = pIdx.def
} else {
expr = row[pIdx.idx]
upsert := &Upsert{}
for _, row := range rows {
var whereExpr sqlparser.Expr
for _, pIdx := range pIndexes {
var expr sqlparser.Expr
if pIdx.idx == -1 {
expr = pIdx.def
} else {
expr = row[pIdx.idx]
}
equalExpr := sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil)
if whereExpr == nil {
whereExpr = equalExpr
} else {
whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr)
}
}
equalExpr := sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil)
if whereExpr == nil {
whereExpr = equalExpr
} else {
whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr)

var updExprs sqlparser.UpdateExprs
for _, ue := range ins.OnDup {
expr := sqlparser.CopyOnRewrite(ue.Expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) {
vfExpr, ok := cursor.Node().(*sqlparser.ValuesFuncExpr)
if !ok {
return
}
idx := ins.Columns.FindColumn(vfExpr.Name.Name)
if idx == -1 {
panic(vterrors.VT03014(sqlparser.String(vfExpr.Name), "field list"))
}
cursor.Replace(row[idx])
}, nil).(sqlparser.Expr)
updExprs = append(updExprs, &sqlparser.UpdateExpr{
Name: ue.Name,
Expr: expr,
})
}
}

var updExprs sqlparser.UpdateExprs
for _, ue := range ins.OnDup {
expr := sqlparser.CopyOnRewrite(ue.Expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) {
vfExpr, ok := cursor.Node().(*sqlparser.ValuesFuncExpr)
if !ok {
return
}
idx := ins.Columns.FindColumn(vfExpr.Name.Name)
if idx == -1 {
panic(vterrors.VT03014(sqlparser.String(vfExpr.Name), "field list"))
}
cursor.Replace(row[idx])
}, nil).(sqlparser.Expr)
updExprs = append(updExprs, &sqlparser.UpdateExpr{
Name: ue.Name,
Expr: expr,
upd := &sqlparser.Update{
Comments: ins.Comments,
TableExprs: sqlparser.TableExprs{ins.Table},
Exprs: updExprs,
Where: sqlparser.NewWhere(sqlparser.WhereClause, whereExpr),
}
updOp := createOpFromStmt(ctx, upd, false, "")

// replan insert statement without on duplicate key update.
newInsert := sqlparser.CloneRefOfInsert(ins)
newInsert.OnDup = nil
newInsert.Rows = sqlparser.Values{row}
insOp = createOpFromStmt(ctx, newInsert, false, "")
upsert.Sources = append(upsert.Sources, UpsertSource{
Insert: insOp,
Update: updOp,
})
}

upd := &sqlparser.Update{
Comments: ins.Comments,
TableExprs: sqlparser.TableExprs{ins.Table},
Exprs: updExprs,
Where: sqlparser.NewWhere(sqlparser.WhereClause, whereExpr),
}
updOp := createOpFromStmt(ctx, upd, false, "")

// replan insert statement without on duplicate key update.
ins = sqlparser.CloneRefOfInsert(ins)
ins.OnDup = nil
insOp = createOpFromStmt(ctx, ins, false, "")

return &Upsert{
InsertOp: insOp,
UpdateOp: updOp,
}
return upsert
}
Loading

0 comments on commit 213e423

Please sign in to comment.