From fae6169aebaa71185106f25fa95cd29f4df30ed4 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 29 Nov 2023 21:01:05 +0530 Subject: [PATCH 01/10] feat: added upsert engine primitive Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/upsert.go | 87 +++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 go/vt/vtgate/engine/upsert.go diff --git a/go/vt/vtgate/engine/upsert.go b/go/vt/vtgate/engine/upsert.go new file mode 100644 index 00000000000..23348d63ebc --- /dev/null +++ b/go/vt/vtgate/engine/upsert.go @@ -0,0 +1,87 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + +var _ Primitive = (*Upsert)(nil) + +// Upsert Primitive will execute the insert primitive first and +// if there is `Duplicate Key` error, it executes the update primitive. +type Upsert struct { + updPrimitive Primitive + insPrimitive Primitive + + txNeeded +} + +func (u *Upsert) RouteType() string { + return "UPSERT" +} + +func (u *Upsert) GetKeyspaceName() string { + return u.insPrimitive.GetKeyspaceName() +} + +func (u *Upsert) GetTableName() string { + return u.insPrimitive.GetTableName() +} + +func (u *Upsert) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return nil, vterrors.VT13001("unexpected to receive GetFields call for insert on duplicate key update query") +} + +func (u *Upsert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + qr, err := vcursor.ExecutePrimitive(ctx, u.insPrimitive, bindVars, wantfields) + if err == nil { + return qr, nil + } + if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { + return nil, err + } + return vcursor.ExecutePrimitive(ctx, u.updPrimitive, bindVars, wantfields) +} + +func (u *Upsert) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + err := vcursor.StreamExecutePrimitive(ctx, u.insPrimitive, bindVars, wantfields, callback) + if err == nil { + return nil + } + if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { + return err + } + return vcursor.StreamExecutePrimitive(ctx, u.updPrimitive, bindVars, wantfields, callback) +} + +func (u *Upsert) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{u.updPrimitive, u.insPrimitive}, nil +} + +func (u *Upsert) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "Upsert", + TargetTabletType: topodatapb.TabletType_PRIMARY, + } +} From 00962ee7f3d1336eef2a9f7703f4f17ce7ff1721 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 30 Nov 2023 12:00:25 +0530 Subject: [PATCH 02/10] sizegen update Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/cached_size.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 807c7604412..0c2b1254556 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1262,6 +1262,24 @@ func (cached *UpdateTarget) CachedSize(alloc bool) int64 { size += hack.RuntimeAllocSize(int64(len(cached.Target))) return size } +func (cached *Upsert) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(32) + } + // field updPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.updPrimitive.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field insPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.insPrimitive.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} func (cached *UserDefinedVariable) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) From 6d5c612c9ed9badf715e1a4aa14e33476df00f11 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 30 Nov 2023 18:03:43 +0530 Subject: [PATCH 03/10] insert on duplicate key update plan for single row insert Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/cached_size.go | 8 +- go/vt/vtgate/engine/upsert.go | 18 +-- .../planbuilder/operator_transformers.go | 18 +++ go/vt/vtgate/planbuilder/operators/insert.go | 132 ++++++++++-------- go/vt/vtgate/planbuilder/operators/upsert.go | 125 +++++++++++++++++ go/vt/vtgate/planbuilder/upsert.go | 36 +++++ 6 files changed, 267 insertions(+), 70 deletions(-) create mode 100644 go/vt/vtgate/planbuilder/operators/upsert.go create mode 100644 go/vt/vtgate/planbuilder/upsert.go diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 0c2b1254556..7feb78fc8e7 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1270,12 +1270,12 @@ func (cached *Upsert) CachedSize(alloc bool) int64 { if alloc { size += int64(32) } - // field updPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive - if cc, ok := cached.updPrimitive.(cachedObject); ok { + // field InsPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.InsPrimitive.(cachedObject); ok { size += cc.CachedSize(true) } - // field insPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive - if cc, ok := cached.insPrimitive.(cachedObject); ok { + // field UpdPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.UpdPrimitive.(cachedObject); ok { size += cc.CachedSize(true) } return size diff --git a/go/vt/vtgate/engine/upsert.go b/go/vt/vtgate/engine/upsert.go index 23348d63ebc..299e9f267b2 100644 --- a/go/vt/vtgate/engine/upsert.go +++ b/go/vt/vtgate/engine/upsert.go @@ -31,8 +31,8 @@ var _ Primitive = (*Upsert)(nil) // Upsert Primitive will execute the insert primitive first and // if there is `Duplicate Key` error, it executes the update primitive. type Upsert struct { - updPrimitive Primitive - insPrimitive Primitive + InsPrimitive Primitive + UpdPrimitive Primitive txNeeded } @@ -42,11 +42,11 @@ func (u *Upsert) RouteType() string { } func (u *Upsert) GetKeyspaceName() string { - return u.insPrimitive.GetKeyspaceName() + return u.InsPrimitive.GetKeyspaceName() } func (u *Upsert) GetTableName() string { - return u.insPrimitive.GetTableName() + return u.InsPrimitive.GetTableName() } func (u *Upsert) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { @@ -54,29 +54,29 @@ func (u *Upsert) GetFields(ctx context.Context, vcursor VCursor, bindVars map[st } func (u *Upsert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - qr, err := vcursor.ExecutePrimitive(ctx, u.insPrimitive, bindVars, wantfields) + qr, err := vcursor.ExecutePrimitive(ctx, u.InsPrimitive, bindVars, wantfields) if err == nil { return qr, nil } if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { return nil, err } - return vcursor.ExecutePrimitive(ctx, u.updPrimitive, bindVars, wantfields) + return vcursor.ExecutePrimitive(ctx, u.UpdPrimitive, bindVars, wantfields) } func (u *Upsert) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - err := vcursor.StreamExecutePrimitive(ctx, u.insPrimitive, bindVars, wantfields, callback) + err := vcursor.StreamExecutePrimitive(ctx, u.InsPrimitive, bindVars, wantfields, callback) if err == nil { return nil } if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { return err } - return vcursor.StreamExecutePrimitive(ctx, u.updPrimitive, bindVars, wantfields, callback) + return vcursor.StreamExecutePrimitive(ctx, u.UpdPrimitive, bindVars, wantfields, callback) } func (u *Upsert) Inputs() ([]Primitive, []map[string]any) { - return []Primitive{u.updPrimitive, u.insPrimitive}, nil + return []Primitive{u.UpdPrimitive, u.InsPrimitive}, nil } func (u *Upsert) description() PrimitiveDescription { diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index 3974a307e71..e4d194342c8 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -66,6 +66,8 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera return transformFkVerify(ctx, op) case *operators.InsertSelection: return transformInsertionSelection(ctx, op) + case *operators.Upsert: + return transformUpsert(ctx, op) case *operators.HashJoin: return transformHashJoin(ctx, op) case *operators.Sequential: @@ -75,6 +77,22 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op)) } +func transformUpsert(ctx *plancontext.PlanningContext, op *operators.Upsert) (logicalPlan, error) { + ip, err := transformToLogicalPlan(ctx, op.InsertOp) + if err != nil { + return nil, err + } + up, err := transformToLogicalPlan(ctx, op.UpdateOp) + if err != nil { + return nil, err + } + + return &upsert{ + insert: ip, + update: up, + }, nil +} + func transformSequential(ctx *plancontext.PlanningContext, op *operators.Sequential) (logicalPlan, error) { var lps []logicalPlan for _, source := range op.Sources { diff --git a/go/vt/vtgate/planbuilder/operators/insert.go b/go/vt/vtgate/planbuilder/operators/insert.go index f783ac7a5bc..41e3b489d7d 100644 --- a/go/vt/vtgate/planbuilder/operators/insert.go +++ b/go/vt/vtgate/planbuilder/operators/insert.go @@ -141,6 +141,47 @@ func createOperatorFromInsert(ctx *plancontext.PlanningContext, ins *sqlparser.I return &Sequential{Sources: []Operator{delOp, insOp}} } +func checkAndCreateInsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) Operator { + insOp := createInsertOperator(ctx, ins, vTbl, routing) + + // Find the foreign key mode and for unmanaged foreign-key-mode, we don't need to do anything. + ksMode, err := ctx.VSchema.ForeignKeyMode(vTbl.Keyspace.Name) + if err != nil { + panic(err) + } + if ksMode != vschemapb.Keyspace_managed { + return insOp + } + + parentFKs := ctx.SemTable.GetParentForeignKeysList() + childFks := ctx.SemTable.GetChildForeignKeysList() + if len(parentFKs) > 0 { + panic(vterrors.VT12002()) + } + if len(childFks) > 0 { + if ins.Action == sqlparser.ReplaceAct { + panic(vterrors.VT12001("REPLACE INTO with foreign keys")) + } + if len(ins.OnDup) > 0 { + row := getSingleRowOrError(ins) + return createUpsertOperator(ctx, ins, insOp, row, vTbl) + } + } + return insOp +} + +func getSingleRowOrError(ins *sqlparser.Insert) sqlparser.ValTuple { + switch rows := ins.Rows.(type) { + case sqlparser.SelectStatement: + panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with select statement")) + case sqlparser.Values: + if len(rows) == 1 { + return rows[0] + } + } + panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with multiple rows")) +} + func getWhereCondExpr(compExprs []*sqlparser.ComparisonExpr) sqlparser.Expr { var outputExpr sqlparser.Expr for _, expr := range compExprs { @@ -163,25 +204,7 @@ func pkCompExpression(vTbl *vindexes.Table, ins *sqlparser.Insert, rows sqlparse if len(vTbl.PrimaryKey) == 0 { return nil } - type pComp struct { - idx int - def sqlparser.Expr - } - var pIndexes []pComp - var pColTuple sqlparser.ValTuple - for _, pCol := range vTbl.PrimaryKey { - var def sqlparser.Expr - idx := ins.Columns.FindColumn(pCol) - if idx == -1 { - def = findDefault(vTbl, pCol) - if def == nil { - // If default value is empty, nothing to compare as it will always be false. - return nil - } - } - pIndexes = append(pIndexes, pComp{idx, def}) - pColTuple = append(pColTuple, sqlparser.NewColName(pCol.String())) - } + pIndexes, pColTuple := findPKIndexes(vTbl, ins) var pValTuple sqlparser.ValTuple for _, row := range rows { @@ -198,6 +221,29 @@ func pkCompExpression(vTbl *vindexes.Table, ins *sqlparser.Insert, rows sqlparse return sqlparser.NewComparisonExpr(sqlparser.InOp, pColTuple, pValTuple, nil) } +type pComp struct { + idx int + def sqlparser.Expr + col sqlparser.IdentifierCI +} + +func findPKIndexes(vTbl *vindexes.Table, ins *sqlparser.Insert) (pIndexes []pComp, pColTuple sqlparser.ValTuple) { + for _, pCol := range vTbl.PrimaryKey { + var def sqlparser.Expr + idx := ins.Columns.FindColumn(pCol) + if idx == -1 { + def = findDefault(vTbl, pCol) + if def == nil { + // If default value is empty, nothing to compare as it will always be false. + return nil, nil + } + } + pIndexes = append(pIndexes, pComp{idx, def, pCol}) + pColTuple = append(pColTuple, sqlparser.NewColName(pCol.String())) + } + return +} + func findDefault(vTbl *vindexes.Table, pCol sqlparser.IdentifierCI) sqlparser.Expr { for _, column := range vTbl.Columns { if column.Name.Equal(pCol) { @@ -311,42 +357,7 @@ func createUniqueKeyComp(ins *sqlparser.Insert, expr sqlparser.Expr, vTbl *vinde return offsets, false } -func checkAndCreateInsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) Operator { - insOp := createInsertOperator(ctx, ins, vTbl, routing) - - if ins.Comments != nil { - insOp = &LockAndComment{ - Source: insOp, - Comments: ins.Comments, - } - } - - // Find the foreign key mode and for unmanaged foreign-key-mode, we don't need to do anything. - ksMode, err := ctx.VSchema.ForeignKeyMode(vTbl.Keyspace.Name) - if err != nil { - return nil - } - if ksMode != vschemapb.Keyspace_managed { - return insOp - } - - parentFKs := ctx.SemTable.GetParentForeignKeysList() - childFks := ctx.SemTable.GetChildForeignKeysList() - if len(parentFKs) > 0 { - panic(vterrors.VT12002()) - } - if len(childFks) > 0 { - if ins.Action == sqlparser.ReplaceAct { - panic(vterrors.VT12001("REPLACE INTO with foreign keys")) - } - if len(ins.OnDup) > 0 { - panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys")) - } - } - return insOp -} - -func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) Operator { +func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) (op Operator) { if _, target := routing.(*TargetedRouting); target { panic(vterrors.VT09017("INSERT with a target destination is not allowed")) } @@ -381,11 +392,18 @@ func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.I insOp.ColVindexes = getColVindexes(insOp) switch rows := insStmt.Rows.(type) { case sqlparser.Values: + op = route route.Source = insertRowsPlan(ctx, insOp, insStmt, rows) case sqlparser.SelectStatement: - return insertSelectPlan(ctx, insOp, route, insStmt, rows) + op = insertSelectPlan(ctx, insOp, route, insStmt, rows) + } + if insStmt.Comments != nil { + op = &LockAndComment{ + Source: op, + Comments: insStmt.Comments, + } } - return route + return op } func insertSelectPlan( diff --git a/go/vt/vtgate/planbuilder/operators/upsert.go b/go/vt/vtgate/planbuilder/operators/upsert.go new file mode 100644 index 00000000000..5f2b9778742 --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/upsert.go @@ -0,0 +1,125 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +var _ Operator = (*Upsert)(nil) + +// Upsert represents an insert on duplicate key operation on a table. +type Upsert struct { + InsertOp Operator + UpdateOp Operator + + noColumns + noPredicates +} + +func (u *Upsert) Clone(inputs []Operator) Operator { + return &Upsert{ + InsertOp: inputs[0], + UpdateOp: inputs[1], + } +} + +func (u *Upsert) Inputs() []Operator { + return []Operator{u.InsertOp, u.UpdateOp} +} + +func (u *Upsert) SetInputs(operators []Operator) { + u.InsertOp = operators[0] + u.UpdateOp = operators[1] +} + +func (u *Upsert) ShortDescription() string { + return "" +} + +func (u *Upsert) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { + return nil +} + +func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, insOp Operator, row sqlparser.ValTuple, vTbl *vindexes.Table) Operator { + if len(vTbl.UniqueKeys) != 0 { + panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with unique keys")) + } + + pIndexes, _ := findPKIndexes(vTbl, ins) + if len(pIndexes) == 0 { + // nothing to compare for update. + // Hence, only perform insert. + return insOp + } + + var whereExpr sqlparser.Expr + for _, pIdx := range pIndexes { + var expr sqlparser.Expr + if pIdx.idx == -1 { + expr = pIdx.def + } else { + expr = row[pIdx.idx] + } + equalExpr := sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil) + if whereExpr == nil { + whereExpr = equalExpr + } else { + whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr) + } + } + + var updExprs sqlparser.UpdateExprs + for _, ue := range ins.OnDup { + expr := sqlparser.CopyOnRewrite(ue.Expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) { + vfExpr, ok := cursor.Node().(*sqlparser.ValuesFuncExpr) + if !ok { + return + } + idx := ins.Columns.FindColumn(vfExpr.Name.Name) + if idx == -1 { + panic(vterrors.VT03014(sqlparser.String(vfExpr.Name), "field list")) + } + cursor.Replace(row[idx]) + }, nil).(sqlparser.Expr) + updExprs = append(updExprs, &sqlparser.UpdateExpr{ + Name: ue.Name, + Expr: expr, + }) + } + + upd := &sqlparser.Update{ + Comments: ins.Comments, + TableExprs: sqlparser.TableExprs{ins.Table}, + Exprs: updExprs, + Where: sqlparser.NewWhere(sqlparser.WhereClause, whereExpr), + } + updOp := createOpFromStmt(ctx, upd, false, "") + + // replan insert statement without on duplicate key update. + ins = sqlparser.CloneRefOfInsert(ins) + ins.OnDup = nil + insOp = createOpFromStmt(ctx, ins, false, "") + + return &Upsert{ + InsertOp: insOp, + UpdateOp: updOp, + } +} diff --git a/go/vt/vtgate/planbuilder/upsert.go b/go/vt/vtgate/planbuilder/upsert.go new file mode 100644 index 00000000000..c9563e39a22 --- /dev/null +++ b/go/vt/vtgate/planbuilder/upsert.go @@ -0,0 +1,36 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planbuilder + +import ( + "vitess.io/vitess/go/vt/vtgate/engine" +) + +type upsert struct { + insert logicalPlan + update logicalPlan +} + +var _ logicalPlan = (*upsert)(nil) + +// Primitive implements the logicalPlan interface +func (u *upsert) Primitive() engine.Primitive { + return &engine.Upsert{ + InsPrimitive: u.insert.Primitive(), + UpdPrimitive: u.update.Primitive(), + } +} From 59813b50beb4967e80a3129b873016d03d3ebb95 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 30 Nov 2023 18:05:34 +0530 Subject: [PATCH 04/10] disallow autocommit for insert in upsert plan Signed-off-by: Harshit Gangal --- go/vt/vtgate/planbuilder/operator_transformers.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index e4d194342c8..c434d07c38f 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -82,6 +82,9 @@ func transformUpsert(ctx *plancontext.PlanningContext, op *operators.Upsert) (lo if err != nil { return nil, err } + if ins, ok := ip.(*insert); ok { + ins.eInsert.PreventAutoCommit = true + } up, err := transformToLogicalPlan(ctx, op.UpdateOp) if err != nil { return nil, err From 7e7ad9902b56afcc07b4912db712e54ebb52c7be Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 30 Nov 2023 21:31:35 +0530 Subject: [PATCH 05/10] feat: update upsert engine to report correct affected rows, added unit and e2e tests Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/foreignkey/fk_test.go | 26 ++ go/vt/vtgate/engine/upsert.go | 21 +- .../testdata/foreignkey_cases.json | 323 +++++++++++++++++- .../testdata/foreignkey_checks_on_cases.json | 159 ++++++++- 4 files changed, 517 insertions(+), 12 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_test.go b/go/test/endtoend/vtgate/foreignkey/fk_test.go index 101cd313e21..8c9e7ac30c7 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_test.go @@ -1207,3 +1207,29 @@ func TestReplaceWithFK(t *testing.T) { utils.AssertMatches(t, conn, `select * from u_t1`, `[[INT64(1) INT64(1)] [INT64(2) INT64(2)]]`) utils.AssertMatches(t, conn, `select * from u_t2`, `[[INT64(1) NULL] [INT64(2) NULL]]`) } + +// TestInsertWithFKOnDup tests that insertion with on duplicate key update works as expected. +func TestInsertWithFKOnDup(t *testing.T) { + mcmp, closer := start(t) + defer closer() + + utils.Exec(t, mcmp.VtConn, "use `uks`") + + // insert some data. + mcmp.Exec(`insert into u_t1(id, col1) values (100, 1), (200, 2), (300, 3), (400, 4)`) + mcmp.Exec(`insert into u_t2(id, col2) values (1000, 1), (2000, 2), (3000, 3), (4000, 4)`) + + // updating child to an existing value in parent. + mcmp.Exec(`insert into u_t2(id, col2) values (4000, 50) on duplicate key update col2 = 1`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) INT64(1)] [INT64(2000) INT64(2)] [INT64(3000) INT64(3)] [INT64(4000) INT64(1)]]`) + + // updating parent, value not referred in child. + mcmp.Exec(`insert into u_t1(id, col1) values (400, 50) on duplicate key update col1 = values(col1)`) + mcmp.AssertMatches(`select * from u_t1 order by id`, `[[INT64(100) INT64(1)] [INT64(200) INT64(2)] [INT64(300) INT64(3)] [INT64(400) INT64(50)]]`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) INT64(1)] [INT64(2000) INT64(2)] [INT64(3000) INT64(3)] [INT64(4000) INT64(1)]]`) + + // updating parent, child updated to null. + mcmp.Exec(`insert into u_t1(id, col1) values (100, 75) on duplicate key update col1 = values(col1)`) + mcmp.AssertMatches(`select * from u_t1 order by id`, `[[INT64(100) INT64(75)] [INT64(200) INT64(2)] [INT64(300) INT64(3)] [INT64(400) INT64(50)]]`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) NULL] [INT64(2000) INT64(2)] [INT64(3000) INT64(3)] [INT64(4000) NULL]]`) +} diff --git a/go/vt/vtgate/engine/upsert.go b/go/vt/vtgate/engine/upsert.go index 299e9f267b2..766effdc646 100644 --- a/go/vt/vtgate/engine/upsert.go +++ b/go/vt/vtgate/engine/upsert.go @@ -54,25 +54,28 @@ func (u *Upsert) GetFields(ctx context.Context, vcursor VCursor, bindVars map[st } func (u *Upsert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - qr, err := vcursor.ExecutePrimitive(ctx, u.InsPrimitive, bindVars, wantfields) + insQr, err := vcursor.ExecutePrimitive(ctx, u.InsPrimitive, bindVars, wantfields) if err == nil { - return qr, nil + return insQr, nil } if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { return nil, err } - return vcursor.ExecutePrimitive(ctx, u.UpdPrimitive, bindVars, wantfields) + updQr, err := vcursor.ExecutePrimitive(ctx, u.UpdPrimitive, bindVars, wantfields) + if err != nil { + return nil, err + } + // To match mysql, need to report +1 on rows affected. + updQr.RowsAffected += 1 + return updQr, nil } func (u *Upsert) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - err := vcursor.StreamExecutePrimitive(ctx, u.InsPrimitive, bindVars, wantfields, callback) - if err == nil { - return nil - } - if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { + qr, err := u.TryExecute(ctx, vcursor, bindVars, wantfields) + if err != nil { return err } - return vcursor.StreamExecutePrimitive(ctx, u.UpdPrimitive, bindVars, wantfields, callback) + return callback(qr) } func (u *Upsert) Inputs() ([]Primitive, []map[string]any) { diff --git a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json index deba02e2009..55562844f33 100644 --- a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json @@ -1658,9 +1658,166 @@ } }, { - "comment": "Insert with on duplicate key update - foreign keys disallowed", + "comment": "Insert with on duplicate key update - foreign key with new value", "query": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", - "plan": "VT12001: unsupported: ON DUPLICATE KEY UPDATE with foreign keys" + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col1 from u_tbl1 where 1 != 1", + "Query": "select col1 from u_tbl1 where id = 1 for update nowait", + "Table": "u_tbl1" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "FkCascade", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where (col2) in ::fkc_vals for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (col3) not in ((cast(5 as CHAR)))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=OFF) */ u_tbl2 set col2 = 5 where (col2) in ::fkc_vals", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "CascadeChild-2", + "OperatorType": "FkCascade", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col9 from u_tbl9 where 1 != 1", + "Query": "select col9 from u_tbl9 where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR))) for update nowait", + "Table": "u_tbl9" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals3", + "Cols": [ + 0 + ], + "Query": "update u_tbl8 set col8 = null where (col8) in ::fkc_vals3", + "Table": "u_tbl8" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl9 set col9 = null where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR)))", + "Table": "u_tbl9" + } + ] + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl1 set col1 = 5 where id = 1", + "Table": "u_tbl1" + } + ] + }, + { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl1", + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3", + "unsharded_fk_allow.u_tbl8", + "unsharded_fk_allow.u_tbl9" + ] + } }, { "comment": "Insert with on duplicate key update - foreign keys not on update column - allowed", @@ -2511,5 +2668,167 @@ "sharded_fk_allow.tbl3" ] } + }, + { + "comment": "Insert with on duplicate key update - foreign key with values function", + "query": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = values(col1)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = values(col1)", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col1 from u_tbl1 where 1 != 1", + "Query": "select col1 from u_tbl1 where id = 1 for update nowait", + "Table": "u_tbl1" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "FkCascade", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where (col2) in ::fkc_vals for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (col3) not in ((cast(3 as CHAR)))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=OFF) */ u_tbl2 set col2 = 3 where (col2) in ::fkc_vals", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "CascadeChild-2", + "OperatorType": "FkCascade", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col9 from u_tbl9 where 1 != 1", + "Query": "select col9 from u_tbl9 where (col9) in ::fkc_vals2 and (col9) not in ((cast(3 as CHAR))) for update nowait", + "Table": "u_tbl9" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals3", + "Cols": [ + 0 + ], + "Query": "update u_tbl8 set col8 = null where (col8) in ::fkc_vals3", + "Table": "u_tbl8" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl9 set col9 = null where (col9) in ::fkc_vals2 and (col9) not in ((cast(3 as CHAR)))", + "Table": "u_tbl9" + } + ] + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl1 set col1 = 3 where id = 1", + "Table": "u_tbl1" + } + ] + }, + { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl1", + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3", + "unsharded_fk_allow.u_tbl8", + "unsharded_fk_allow.u_tbl9" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json b/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json index b4892a99052..f8a93c557b1 100644 --- a/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json @@ -1660,7 +1660,164 @@ { "comment": "Insert with on duplicate key update - foreign keys disallowed", "query": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", - "plan": "VT12001: unsupported: ON DUPLICATE KEY UPDATE with foreign keys" + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col1 from u_tbl1 where 1 != 1", + "Query": "select col1 from u_tbl1 where id = 1 for update nowait", + "Table": "u_tbl1" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "FkCascade", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where (col2) in ::fkc_vals for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update /*+ SET_VAR(foreign_key_checks=ON) */ u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (col3) not in ((cast(5 as CHAR)))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=OFF) */ u_tbl2 set col2 = 5 where (col2) in ::fkc_vals", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "CascadeChild-2", + "OperatorType": "FkCascade", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col9 from u_tbl9 where 1 != 1", + "Query": "select col9 from u_tbl9 where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR))) for update nowait", + "Table": "u_tbl9" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals3", + "Cols": [ + 0 + ], + "Query": "update /*+ SET_VAR(foreign_key_checks=ON) */ u_tbl8 set col8 = null where (col8) in ::fkc_vals3", + "Table": "u_tbl8" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=ON) */ u_tbl9 set col9 = null where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR)))", + "Table": "u_tbl9" + } + ] + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=On) */ u_tbl1 set col1 = 5 where id = 1", + "Table": "u_tbl1" + } + ] + }, + { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert /*+ SET_VAR(foreign_key_checks=On) */ into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl1", + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3", + "unsharded_fk_allow.u_tbl8", + "unsharded_fk_allow.u_tbl9" + ] + } }, { "comment": "Insert with on duplicate key update - foreign keys not on update column - allowed", From 2cec09eda1067d9cdd35ad081d444556a857e5e1 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 5 Dec 2023 13:48:35 +0530 Subject: [PATCH 06/10] feat: update upsert engine to support multiple upserts Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/cached_size.go | 33 +++++++--- go/vt/vtgate/engine/upsert.go | 60 ++++++++++++++++--- .../testdata/foreignkey_cases.json | 52 ++++++++-------- .../testdata/foreignkey_checks_on_cases.json | 26 ++++---- go/vt/vtgate/planbuilder/upsert.go | 7 +-- 5 files changed, 123 insertions(+), 55 deletions(-) diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 7feb78fc8e7..db147f4d36c 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1268,15 +1268,14 @@ func (cached *Upsert) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(32) - } - // field InsPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive - if cc, ok := cached.InsPrimitive.(cachedObject); ok { - size += cc.CachedSize(true) + size += int64(24) } - // field UpdPrimitive vitess.io/vitess/go/vt/vtgate/engine.Primitive - if cc, ok := cached.UpdPrimitive.(cachedObject); ok { - size += cc.CachedSize(true) + // field Upserts []vitess.io/vitess/go/vt/vtgate/engine.upsert + { + size += hack.RuntimeAllocSize(int64(cap(cached.Upserts)) * int64(32)) + for _, elem := range cached.Upserts { + size += elem.CachedSize(false) + } } return size } @@ -1491,3 +1490,21 @@ func (cached *shardRoute) CachedSize(alloc bool) int64 { } return size } +func (cached *upsert) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(32) + } + // field Insert vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Insert.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Update vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Update.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} diff --git a/go/vt/vtgate/engine/upsert.go b/go/vt/vtgate/engine/upsert.go index 766effdc646..6e155741b85 100644 --- a/go/vt/vtgate/engine/upsert.go +++ b/go/vt/vtgate/engine/upsert.go @@ -18,6 +18,7 @@ package engine import ( "context" + "fmt" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -31,37 +32,72 @@ var _ Primitive = (*Upsert)(nil) // Upsert Primitive will execute the insert primitive first and // if there is `Duplicate Key` error, it executes the update primitive. type Upsert struct { - InsPrimitive Primitive - UpdPrimitive Primitive + Upserts []upsert txNeeded } +type upsert struct { + Insert Primitive + Update Primitive +} + +// AddUpsert appends to the Upsert Primitive. +func (u *Upsert) AddUpsert(ins, upd Primitive) { + u.Upserts = append(u.Upserts, upsert{ + Insert: ins, + Update: upd, + }) +} + +// RouteType implements Primitive interface type. func (u *Upsert) RouteType() string { return "UPSERT" } +// GetKeyspaceName implements Primitive interface type. func (u *Upsert) GetKeyspaceName() string { - return u.InsPrimitive.GetKeyspaceName() + if len(u.Upserts) > 0 { + return u.Upserts[0].Insert.GetKeyspaceName() + } + return "" } +// GetTableName implements Primitive interface type. func (u *Upsert) GetTableName() string { - return u.InsPrimitive.GetTableName() + if len(u.Upserts) > 0 { + return u.Upserts[0].Insert.GetTableName() + } + return "" } +// GetFields implements Primitive interface type. func (u *Upsert) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { return nil, vterrors.VT13001("unexpected to receive GetFields call for insert on duplicate key update query") } +// TryExecute implements Primitive interface type. func (u *Upsert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - insQr, err := vcursor.ExecutePrimitive(ctx, u.InsPrimitive, bindVars, wantfields) + result := &sqltypes.Result{} + for _, up := range u.Upserts { + qr, err := execOne(ctx, vcursor, bindVars, wantfields, up) + if err != nil { + return nil, err + } + result.RowsAffected += qr.RowsAffected + } + return result, nil +} + +func execOne(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, up upsert) (*sqltypes.Result, error) { + insQr, err := vcursor.ExecutePrimitive(ctx, up.Insert, bindVars, wantfields) if err == nil { return insQr, nil } if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { return nil, err } - updQr, err := vcursor.ExecutePrimitive(ctx, u.UpdPrimitive, bindVars, wantfields) + updQr, err := vcursor.ExecutePrimitive(ctx, up.Update, bindVars, wantfields) if err != nil { return nil, err } @@ -70,6 +106,7 @@ func (u *Upsert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[s return updQr, nil } +// TryStreamExecute implements Primitive interface type. func (u *Upsert) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { qr, err := u.TryExecute(ctx, vcursor, bindVars, wantfields) if err != nil { @@ -78,8 +115,17 @@ func (u *Upsert) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars return callback(qr) } +// Inputs implements Primitive interface type. func (u *Upsert) Inputs() ([]Primitive, []map[string]any) { - return []Primitive{u.UpdPrimitive, u.InsPrimitive}, nil + var inputs []Primitive + var inputsMap []map[string]any + for i, up := range u.Upserts { + inputs = append(inputs, up.Insert, up.Update) + inputsMap = append(inputsMap, + map[string]any{inputName: fmt.Sprintf("Insert-%d", i+1)}, + map[string]any{inputName: fmt.Sprintf("Update-%d", i+1)}) + } + return inputs, inputsMap } func (u *Upsert) description() PrimitiveDescription { diff --git a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json index 55562844f33..9e59d96b797 100644 --- a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json @@ -1668,6 +1668,20 @@ "TargetTabletType": "PRIMARY", "Inputs": [ { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + }, + { + "InputName": "Update-1", "OperatorType": "FkCascade", "Inputs": [ { @@ -1795,18 +1809,6 @@ "Table": "u_tbl1" } ] - }, - { - "OperatorType": "Insert", - "Variant": "Unsharded", - "Keyspace": { - "Name": "unsharded_fk_allow", - "Sharded": false - }, - "TargetTabletType": "PRIMARY", - "NoAutoCommit": true, - "Query": "insert into u_tbl1(id, col1) values (1, 3)", - "TableName": "u_tbl1" } ] }, @@ -2680,6 +2682,20 @@ "TargetTabletType": "PRIMARY", "Inputs": [ { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + }, + { + "InputName": "Update-1", "OperatorType": "FkCascade", "Inputs": [ { @@ -2807,18 +2823,6 @@ "Table": "u_tbl1" } ] - }, - { - "OperatorType": "Insert", - "Variant": "Unsharded", - "Keyspace": { - "Name": "unsharded_fk_allow", - "Sharded": false - }, - "TargetTabletType": "PRIMARY", - "NoAutoCommit": true, - "Query": "insert into u_tbl1(id, col1) values (1, 3)", - "TableName": "u_tbl1" } ] }, diff --git a/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json b/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json index f8a93c557b1..415b0c12987 100644 --- a/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json @@ -1668,6 +1668,20 @@ "TargetTabletType": "PRIMARY", "Inputs": [ { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert /*+ SET_VAR(foreign_key_checks=On) */ into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + }, + { + "InputName": "Update-1", "OperatorType": "FkCascade", "Inputs": [ { @@ -1795,18 +1809,6 @@ "Table": "u_tbl1" } ] - }, - { - "OperatorType": "Insert", - "Variant": "Unsharded", - "Keyspace": { - "Name": "unsharded_fk_allow", - "Sharded": false - }, - "TargetTabletType": "PRIMARY", - "NoAutoCommit": true, - "Query": "insert /*+ SET_VAR(foreign_key_checks=On) */ into u_tbl1(id, col1) values (1, 3)", - "TableName": "u_tbl1" } ] }, diff --git a/go/vt/vtgate/planbuilder/upsert.go b/go/vt/vtgate/planbuilder/upsert.go index c9563e39a22..097ac3c13fc 100644 --- a/go/vt/vtgate/planbuilder/upsert.go +++ b/go/vt/vtgate/planbuilder/upsert.go @@ -29,8 +29,7 @@ var _ logicalPlan = (*upsert)(nil) // Primitive implements the logicalPlan interface func (u *upsert) Primitive() engine.Primitive { - return &engine.Upsert{ - InsPrimitive: u.insert.Primitive(), - UpdPrimitive: u.update.Primitive(), - } + up := &engine.Upsert{} + up.AddUpsert(u.insert.Primitive(), u.update.Primitive()) + return up } From 213e423cd2c5165e5fa79f2045c0c7689345e520 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 5 Dec 2023 19:52:56 +0530 Subject: [PATCH 07/10] feat: plan insert on duplicate key update with multiple rows Signed-off-by: Harshit Gangal --- .../planbuilder/operator_transformers.go | 30 +-- go/vt/vtgate/planbuilder/operators/insert.go | 17 +- go/vt/vtgate/planbuilder/operators/upsert.go | 133 +++++++----- .../testdata/foreignkey_cases.json | 195 ++++++++++++++++++ go/vt/vtgate/planbuilder/upsert.go | 8 +- 5 files changed, 301 insertions(+), 82 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index c434d07c38f..8bdfd8735a0 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -78,22 +78,28 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera } func transformUpsert(ctx *plancontext.PlanningContext, op *operators.Upsert) (logicalPlan, error) { - ip, err := transformToLogicalPlan(ctx, op.InsertOp) + u := &upsert{} + for _, source := range op.Sources { + iLp, uLp, err := transformOneUpsert(ctx, source) + if err != nil { + return nil, err + } + u.insert = append(u.insert, iLp) + u.update = append(u.update, uLp) + } + return u, nil +} + +func transformOneUpsert(ctx *plancontext.PlanningContext, source operators.UpsertSource) (iLp, uLp logicalPlan, err error) { + iLp, err = transformToLogicalPlan(ctx, source.Insert) if err != nil { - return nil, err + return } - if ins, ok := ip.(*insert); ok { + if ins, ok := iLp.(*insert); ok { ins.eInsert.PreventAutoCommit = true } - up, err := transformToLogicalPlan(ctx, op.UpdateOp) - if err != nil { - return nil, err - } - - return &upsert{ - insert: ip, - update: up, - }, nil + uLp, err = transformToLogicalPlan(ctx, source.Update) + return } func transformSequential(ctx *plancontext.PlanningContext, op *operators.Sequential) (logicalPlan, error) { diff --git a/go/vt/vtgate/planbuilder/operators/insert.go b/go/vt/vtgate/planbuilder/operators/insert.go index 41e3b489d7d..275849a37f8 100644 --- a/go/vt/vtgate/planbuilder/operators/insert.go +++ b/go/vt/vtgate/planbuilder/operators/insert.go @@ -163,23 +163,18 @@ func checkAndCreateInsertOperator(ctx *plancontext.PlanningContext, ins *sqlpars panic(vterrors.VT12001("REPLACE INTO with foreign keys")) } if len(ins.OnDup) > 0 { - row := getSingleRowOrError(ins) - return createUpsertOperator(ctx, ins, insOp, row, vTbl) + rows := getRowsOrError(ins) + return createUpsertOperator(ctx, ins, insOp, rows, vTbl) } } return insOp } -func getSingleRowOrError(ins *sqlparser.Insert) sqlparser.ValTuple { - switch rows := ins.Rows.(type) { - case sqlparser.SelectStatement: - panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with select statement")) - case sqlparser.Values: - if len(rows) == 1 { - return rows[0] - } +func getRowsOrError(ins *sqlparser.Insert) sqlparser.Values { + if rows, ok := ins.Rows.(sqlparser.Values); ok { + return rows } - panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with multiple rows")) + panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with select statement")) } func getWhereCondExpr(compExprs []*sqlparser.ComparisonExpr) sqlparser.Expr { diff --git a/go/vt/vtgate/planbuilder/operators/upsert.go b/go/vt/vtgate/planbuilder/operators/upsert.go index 5f2b9778742..8cd3cd9a521 100644 --- a/go/vt/vtgate/planbuilder/operators/upsert.go +++ b/go/vt/vtgate/planbuilder/operators/upsert.go @@ -27,27 +27,43 @@ var _ Operator = (*Upsert)(nil) // Upsert represents an insert on duplicate key operation on a table. type Upsert struct { - InsertOp Operator - UpdateOp Operator + Sources []UpsertSource noColumns noPredicates } +type UpsertSource struct { + Insert Operator + Update Operator +} + func (u *Upsert) Clone(inputs []Operator) Operator { - return &Upsert{ - InsertOp: inputs[0], - UpdateOp: inputs[1], + up := &Upsert{} + up.setInputs(inputs) + return up +} + +func (u *Upsert) setInputs(inputs []Operator) { + for i := 0; i < len(inputs); i += 2 { + u.Sources = append(u.Sources, UpsertSource{ + Insert: inputs[i], + Update: inputs[i+1], + }) } } func (u *Upsert) Inputs() []Operator { - return []Operator{u.InsertOp, u.UpdateOp} + var inputs []Operator + for _, source := range u.Sources { + inputs = append(inputs, source.Insert, source.Update) + } + return inputs } -func (u *Upsert) SetInputs(operators []Operator) { - u.InsertOp = operators[0] - u.UpdateOp = operators[1] +func (u *Upsert) SetInputs(inputs []Operator) { + u.Sources = nil + u.setInputs(inputs) } func (u *Upsert) ShortDescription() string { @@ -58,7 +74,7 @@ func (u *Upsert) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { return nil } -func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, insOp Operator, row sqlparser.ValTuple, vTbl *vindexes.Table) Operator { +func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, insOp Operator, rows sqlparser.Values, vTbl *vindexes.Table) Operator { if len(vTbl.UniqueKeys) != 0 { panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with unique keys")) } @@ -70,56 +86,61 @@ func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Inser return insOp } - var whereExpr sqlparser.Expr - for _, pIdx := range pIndexes { - var expr sqlparser.Expr - if pIdx.idx == -1 { - expr = pIdx.def - } else { - expr = row[pIdx.idx] + upsert := &Upsert{} + for _, row := range rows { + var whereExpr sqlparser.Expr + for _, pIdx := range pIndexes { + var expr sqlparser.Expr + if pIdx.idx == -1 { + expr = pIdx.def + } else { + expr = row[pIdx.idx] + } + equalExpr := sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil) + if whereExpr == nil { + whereExpr = equalExpr + } else { + whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr) + } } - equalExpr := sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil) - if whereExpr == nil { - whereExpr = equalExpr - } else { - whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr) + + var updExprs sqlparser.UpdateExprs + for _, ue := range ins.OnDup { + expr := sqlparser.CopyOnRewrite(ue.Expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) { + vfExpr, ok := cursor.Node().(*sqlparser.ValuesFuncExpr) + if !ok { + return + } + idx := ins.Columns.FindColumn(vfExpr.Name.Name) + if idx == -1 { + panic(vterrors.VT03014(sqlparser.String(vfExpr.Name), "field list")) + } + cursor.Replace(row[idx]) + }, nil).(sqlparser.Expr) + updExprs = append(updExprs, &sqlparser.UpdateExpr{ + Name: ue.Name, + Expr: expr, + }) } - } - var updExprs sqlparser.UpdateExprs - for _, ue := range ins.OnDup { - expr := sqlparser.CopyOnRewrite(ue.Expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) { - vfExpr, ok := cursor.Node().(*sqlparser.ValuesFuncExpr) - if !ok { - return - } - idx := ins.Columns.FindColumn(vfExpr.Name.Name) - if idx == -1 { - panic(vterrors.VT03014(sqlparser.String(vfExpr.Name), "field list")) - } - cursor.Replace(row[idx]) - }, nil).(sqlparser.Expr) - updExprs = append(updExprs, &sqlparser.UpdateExpr{ - Name: ue.Name, - Expr: expr, + upd := &sqlparser.Update{ + Comments: ins.Comments, + TableExprs: sqlparser.TableExprs{ins.Table}, + Exprs: updExprs, + Where: sqlparser.NewWhere(sqlparser.WhereClause, whereExpr), + } + updOp := createOpFromStmt(ctx, upd, false, "") + + // replan insert statement without on duplicate key update. + newInsert := sqlparser.CloneRefOfInsert(ins) + newInsert.OnDup = nil + newInsert.Rows = sqlparser.Values{row} + insOp = createOpFromStmt(ctx, newInsert, false, "") + upsert.Sources = append(upsert.Sources, UpsertSource{ + Insert: insOp, + Update: updOp, }) } - upd := &sqlparser.Update{ - Comments: ins.Comments, - TableExprs: sqlparser.TableExprs{ins.Table}, - Exprs: updExprs, - Where: sqlparser.NewWhere(sqlparser.WhereClause, whereExpr), - } - updOp := createOpFromStmt(ctx, upd, false, "") - - // replan insert statement without on duplicate key update. - ins = sqlparser.CloneRefOfInsert(ins) - ins.OnDup = nil - insOp = createOpFromStmt(ctx, ins, false, "") - - return &Upsert{ - InsertOp: insOp, - UpdateOp: updOp, - } + return upsert } diff --git a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json index 9e59d96b797..3ab6ef96118 100644 --- a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json @@ -2834,5 +2834,200 @@ "unsharded_fk_allow.u_tbl9" ] } + }, + { + "comment": "insert with on duplicate key update with multiple rows", + "query": "insert into u_tbl2 (id, col2) values (:v1, :v2),(:v3, :v4), (:v5, :v6) on duplicate key update col2 = values(col2)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl2 (id, col2) values (:v1, :v2),(:v3, :v4), (:v5, :v6) on duplicate key update col2 = values(col2)", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl2(id, col2) values (:v1, :v2)", + "TableName": "u_tbl2" + }, + { + "InputName": "Update-1", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where id = :v1 for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals and (cast(:v2 as CHAR) is null or (col3) not in ((cast(:v2 as CHAR))))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl2 set col2 = :v2 where id = :v1", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "Insert-2", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl2(id, col2) values (:v3, :v4)", + "TableName": "u_tbl2" + }, + { + "InputName": "Update-2", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where id = :v3 for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (cast(:v4 as CHAR) is null or (col3) not in ((cast(:v4 as CHAR))))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl2 set col2 = :v4 where id = :v3", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "Insert-3", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl2(id, col2) values (:v5, :v6)", + "TableName": "u_tbl2" + }, + { + "InputName": "Update-3", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where id = :v5 for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals2 and (cast(:v6 as CHAR) is null or (col3) not in ((cast(:v6 as CHAR))))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl2 set col2 = :v6 where id = :v5", + "Table": "u_tbl2" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/upsert.go b/go/vt/vtgate/planbuilder/upsert.go index 097ac3c13fc..cd9c127635c 100644 --- a/go/vt/vtgate/planbuilder/upsert.go +++ b/go/vt/vtgate/planbuilder/upsert.go @@ -21,8 +21,8 @@ import ( ) type upsert struct { - insert logicalPlan - update logicalPlan + insert []logicalPlan + update []logicalPlan } var _ logicalPlan = (*upsert)(nil) @@ -30,6 +30,8 @@ var _ logicalPlan = (*upsert)(nil) // Primitive implements the logicalPlan interface func (u *upsert) Primitive() engine.Primitive { up := &engine.Upsert{} - up.AddUpsert(u.insert.Primitive(), u.update.Primitive()) + for i := 0; i < len(u.insert); i++ { + up.AddUpsert(u.insert[i].Primitive(), u.update[i].Primitive()) + } return up } From bc51d7e10c80ad0c38bffaa92ebb977a7b2b2729 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 5 Dec 2023 20:45:25 +0530 Subject: [PATCH 08/10] feat: add +1 to rowaffected in upsert engine only if update returns a rowaffected value, added e2e test for multiple rows Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/foreignkey/fk_test.go | 5 +++++ go/vt/vtgate/engine/upsert.go | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/fk_test.go b/go/test/endtoend/vtgate/foreignkey/fk_test.go index 8c9e7ac30c7..721bb4f9862 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_test.go @@ -1232,4 +1232,9 @@ func TestInsertWithFKOnDup(t *testing.T) { mcmp.Exec(`insert into u_t1(id, col1) values (100, 75) on duplicate key update col1 = values(col1)`) mcmp.AssertMatches(`select * from u_t1 order by id`, `[[INT64(100) INT64(75)] [INT64(200) INT64(2)] [INT64(300) INT64(3)] [INT64(400) INT64(50)]]`) mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) NULL] [INT64(2000) INT64(2)] [INT64(3000) INT64(3)] [INT64(4000) NULL]]`) + + // inserting multiple rows in parent, some child rows updated to null. + mcmp.Exec(`insert into u_t1(id, col1) values (100, 42),(600, 2),(300, 24),(200, 2) on duplicate key update col1 = values(col1)`) + mcmp.AssertMatches(`select * from u_t1 order by id`, `[[INT64(100) INT64(42)] [INT64(200) INT64(2)] [INT64(300) INT64(24)] [INT64(400) INT64(50)] [INT64(600) INT64(2)]]`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) NULL] [INT64(2000) INT64(2)] [INT64(3000) NULL] [INT64(4000) NULL]]`) } diff --git a/go/vt/vtgate/engine/upsert.go b/go/vt/vtgate/engine/upsert.go index 6e155741b85..8542892ada4 100644 --- a/go/vt/vtgate/engine/upsert.go +++ b/go/vt/vtgate/engine/upsert.go @@ -101,8 +101,10 @@ func execOne(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb. if err != nil { return nil, err } - // To match mysql, need to report +1 on rows affected. - updQr.RowsAffected += 1 + // To match mysql, need to report +1 on rows affected if there is any change. + if updQr.RowsAffected > 0 { + updQr.RowsAffected += 1 + } return updQr, nil } From b3b4eddc2a436d79ab0568ba8fe005cb19dd3747 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 12 Dec 2023 14:36:55 +0530 Subject: [PATCH 09/10] simplified the if condition check Signed-off-by: Harshit Gangal --- go/vt/vtgate/planbuilder/operators/upsert.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/upsert.go b/go/vt/vtgate/planbuilder/operators/upsert.go index 8cd3cd9a521..fc6a7d48150 100644 --- a/go/vt/vtgate/planbuilder/operators/upsert.go +++ b/go/vt/vtgate/planbuilder/operators/upsert.go @@ -97,11 +97,7 @@ func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Inser expr = row[pIdx.idx] } equalExpr := sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil) - if whereExpr == nil { - whereExpr = equalExpr - } else { - whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr) - } + whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr) } var updExprs sqlparser.UpdateExprs From 50bd269144cd11af6af88facd2c08b6251abf17f Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 12 Dec 2023 16:39:00 +0530 Subject: [PATCH 10/10] addressed review comments Signed-off-by: Harshit Gangal --- go/vt/vtgate/planbuilder/operators/upsert.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/upsert.go b/go/vt/vtgate/planbuilder/operators/upsert.go index fc6a7d48150..8f028a790b2 100644 --- a/go/vt/vtgate/planbuilder/operators/upsert.go +++ b/go/vt/vtgate/planbuilder/operators/upsert.go @@ -88,7 +88,7 @@ func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Inser upsert := &Upsert{} for _, row := range rows { - var whereExpr sqlparser.Expr + var comparisons []sqlparser.Expr for _, pIdx := range pIndexes { var expr sqlparser.Expr if pIdx.idx == -1 { @@ -96,9 +96,10 @@ func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Inser } else { expr = row[pIdx.idx] } - equalExpr := sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil) - whereExpr = sqlparser.AndExpressions(whereExpr, equalExpr) + comparisons = append(comparisons, + sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil)) } + whereExpr := sqlparser.AndExpressions(comparisons...) var updExprs sqlparser.UpdateExprs for _, ue := range ins.OnDup {