From 31b0959ea6c0ecdece166c0e5eee9aa08d38031e Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 12 Dec 2023 18:07:09 +0530 Subject: [PATCH] Add foreign key support for insert on duplicate key update (#14638) Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/foreignkey/fk_test.go | 31 ++ go/vt/vtgate/engine/cached_size.go | 35 ++ go/vt/vtgate/engine/upsert.go | 138 +++++ .../planbuilder/operator_transformers.go | 27 + go/vt/vtgate/planbuilder/operators/insert.go | 127 +++-- go/vt/vtgate/planbuilder/operators/upsert.go | 143 +++++ .../testdata/foreignkey_cases.json | 522 +++++++++++++++++- .../testdata/foreignkey_checks_on_cases.json | 161 +++++- go/vt/vtgate/planbuilder/upsert.go | 37 ++ 9 files changed, 1161 insertions(+), 60 deletions(-) create mode 100644 go/vt/vtgate/engine/upsert.go create mode 100644 go/vt/vtgate/planbuilder/operators/upsert.go create mode 100644 go/vt/vtgate/planbuilder/upsert.go diff --git a/go/test/endtoend/vtgate/foreignkey/fk_test.go b/go/test/endtoend/vtgate/foreignkey/fk_test.go index c3f1d9dc29c..956c268d895 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_test.go @@ -1208,6 +1208,37 @@ func TestReplaceWithFK(t *testing.T) { utils.AssertMatches(t, conn, `select * from u_t2`, `[[INT64(1) NULL] [INT64(2) NULL]]`) } +// TestInsertWithFKOnDup tests that insertion with on duplicate key update works as expected. +func TestInsertWithFKOnDup(t *testing.T) { + mcmp, closer := start(t) + defer closer() + + utils.Exec(t, mcmp.VtConn, "use `uks`") + + // insert some data. + mcmp.Exec(`insert into u_t1(id, col1) values (100, 1), (200, 2), (300, 3), (400, 4)`) + mcmp.Exec(`insert into u_t2(id, col2) values (1000, 1), (2000, 2), (3000, 3), (4000, 4)`) + + // updating child to an existing value in parent. + mcmp.Exec(`insert into u_t2(id, col2) values (4000, 50) on duplicate key update col2 = 1`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) INT64(1)] [INT64(2000) INT64(2)] [INT64(3000) INT64(3)] [INT64(4000) INT64(1)]]`) + + // updating parent, value not referred in child. + mcmp.Exec(`insert into u_t1(id, col1) values (400, 50) on duplicate key update col1 = values(col1)`) + mcmp.AssertMatches(`select * from u_t1 order by id`, `[[INT64(100) INT64(1)] [INT64(200) INT64(2)] [INT64(300) INT64(3)] [INT64(400) INT64(50)]]`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) INT64(1)] [INT64(2000) INT64(2)] [INT64(3000) INT64(3)] [INT64(4000) INT64(1)]]`) + + // updating parent, child updated to null. + mcmp.Exec(`insert into u_t1(id, col1) values (100, 75) on duplicate key update col1 = values(col1)`) + mcmp.AssertMatches(`select * from u_t1 order by id`, `[[INT64(100) INT64(75)] [INT64(200) INT64(2)] [INT64(300) INT64(3)] [INT64(400) INT64(50)]]`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) NULL] [INT64(2000) INT64(2)] [INT64(3000) INT64(3)] [INT64(4000) NULL]]`) + + // inserting multiple rows in parent, some child rows updated to null. + mcmp.Exec(`insert into u_t1(id, col1) values (100, 42),(600, 2),(300, 24),(200, 2) on duplicate key update col1 = values(col1)`) + mcmp.AssertMatches(`select * from u_t1 order by id`, `[[INT64(100) INT64(42)] [INT64(200) INT64(2)] [INT64(300) INT64(24)] [INT64(400) INT64(50)] [INT64(600) INT64(2)]]`) + mcmp.AssertMatches(`select * from u_t2 order by id`, `[[INT64(1000) NULL] [INT64(2000) INT64(2)] [INT64(3000) NULL] [INT64(4000) NULL]]`) +} + // TestDDLFk tests that table is created with fk constraint when foreign_key_checks is off. func TestDDLFk(t *testing.T) { mcmp, closer := start(t) diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index ee84317bc00..6da8b1b56d4 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1267,6 +1267,23 @@ func (cached *UpdateTarget) CachedSize(alloc bool) int64 { size += hack.RuntimeAllocSize(int64(len(cached.Target))) return size } +func (cached *Upsert) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(24) + } + // field Upserts []vitess.io/vitess/go/vt/vtgate/engine.upsert + { + size += hack.RuntimeAllocSize(int64(cap(cached.Upserts)) * int64(32)) + for _, elem := range cached.Upserts { + size += elem.CachedSize(false) + } + } + return size +} func (cached *UserDefinedVariable) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) @@ -1478,3 +1495,21 @@ func (cached *shardRoute) CachedSize(alloc bool) int64 { } return size } +func (cached *upsert) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(32) + } + // field Insert vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Insert.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Update vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Update.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} diff --git a/go/vt/vtgate/engine/upsert.go b/go/vt/vtgate/engine/upsert.go new file mode 100644 index 00000000000..8542892ada4 --- /dev/null +++ b/go/vt/vtgate/engine/upsert.go @@ -0,0 +1,138 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "fmt" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + +var _ Primitive = (*Upsert)(nil) + +// Upsert Primitive will execute the insert primitive first and +// if there is `Duplicate Key` error, it executes the update primitive. +type Upsert struct { + Upserts []upsert + + txNeeded +} + +type upsert struct { + Insert Primitive + Update Primitive +} + +// AddUpsert appends to the Upsert Primitive. +func (u *Upsert) AddUpsert(ins, upd Primitive) { + u.Upserts = append(u.Upserts, upsert{ + Insert: ins, + Update: upd, + }) +} + +// RouteType implements Primitive interface type. +func (u *Upsert) RouteType() string { + return "UPSERT" +} + +// GetKeyspaceName implements Primitive interface type. +func (u *Upsert) GetKeyspaceName() string { + if len(u.Upserts) > 0 { + return u.Upserts[0].Insert.GetKeyspaceName() + } + return "" +} + +// GetTableName implements Primitive interface type. +func (u *Upsert) GetTableName() string { + if len(u.Upserts) > 0 { + return u.Upserts[0].Insert.GetTableName() + } + return "" +} + +// GetFields implements Primitive interface type. +func (u *Upsert) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return nil, vterrors.VT13001("unexpected to receive GetFields call for insert on duplicate key update query") +} + +// TryExecute implements Primitive interface type. +func (u *Upsert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + result := &sqltypes.Result{} + for _, up := range u.Upserts { + qr, err := execOne(ctx, vcursor, bindVars, wantfields, up) + if err != nil { + return nil, err + } + result.RowsAffected += qr.RowsAffected + } + return result, nil +} + +func execOne(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, up upsert) (*sqltypes.Result, error) { + insQr, err := vcursor.ExecutePrimitive(ctx, up.Insert, bindVars, wantfields) + if err == nil { + return insQr, nil + } + if vterrors.Code(err) != vtrpcpb.Code_ALREADY_EXISTS { + return nil, err + } + updQr, err := vcursor.ExecutePrimitive(ctx, up.Update, bindVars, wantfields) + if err != nil { + return nil, err + } + // To match mysql, need to report +1 on rows affected if there is any change. + if updQr.RowsAffected > 0 { + updQr.RowsAffected += 1 + } + return updQr, nil +} + +// TryStreamExecute implements Primitive interface type. +func (u *Upsert) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + qr, err := u.TryExecute(ctx, vcursor, bindVars, wantfields) + if err != nil { + return err + } + return callback(qr) +} + +// Inputs implements Primitive interface type. +func (u *Upsert) Inputs() ([]Primitive, []map[string]any) { + var inputs []Primitive + var inputsMap []map[string]any + for i, up := range u.Upserts { + inputs = append(inputs, up.Insert, up.Update) + inputsMap = append(inputsMap, + map[string]any{inputName: fmt.Sprintf("Insert-%d", i+1)}, + map[string]any{inputName: fmt.Sprintf("Update-%d", i+1)}) + } + return inputs, inputsMap +} + +func (u *Upsert) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "Upsert", + TargetTabletType: topodatapb.TabletType_PRIMARY, + } +} diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index 3b117411921..1adcc61972a 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -66,6 +66,8 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera return transformFkVerify(ctx, op) case *operators.InsertSelection: return transformInsertionSelection(ctx, op) + case *operators.Upsert: + return transformUpsert(ctx, op) case *operators.HashJoin: return transformHashJoin(ctx, op) case *operators.Sequential: @@ -75,6 +77,31 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op)) } +func transformUpsert(ctx *plancontext.PlanningContext, op *operators.Upsert) (logicalPlan, error) { + u := &upsert{} + for _, source := range op.Sources { + iLp, uLp, err := transformOneUpsert(ctx, source) + if err != nil { + return nil, err + } + u.insert = append(u.insert, iLp) + u.update = append(u.update, uLp) + } + return u, nil +} + +func transformOneUpsert(ctx *plancontext.PlanningContext, source operators.UpsertSource) (iLp, uLp logicalPlan, err error) { + iLp, err = transformToLogicalPlan(ctx, source.Insert) + if err != nil { + return + } + if ins, ok := iLp.(*insert); ok { + ins.eInsert.PreventAutoCommit = true + } + uLp, err = transformToLogicalPlan(ctx, source.Update) + return +} + func transformSequential(ctx *plancontext.PlanningContext, op *operators.Sequential) (logicalPlan, error) { var lps []logicalPlan for _, source := range op.Sources { diff --git a/go/vt/vtgate/planbuilder/operators/insert.go b/go/vt/vtgate/planbuilder/operators/insert.go index 5cb25feb66c..a25d9bc471f 100644 --- a/go/vt/vtgate/planbuilder/operators/insert.go +++ b/go/vt/vtgate/planbuilder/operators/insert.go @@ -142,6 +142,42 @@ func createOperatorFromInsert(ctx *plancontext.PlanningContext, ins *sqlparser.I return &Sequential{Sources: []Operator{delOp, insOp}} } +func checkAndCreateInsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) Operator { + insOp := createInsertOperator(ctx, ins, vTbl, routing) + + // Find the foreign key mode and for unmanaged foreign-key-mode, we don't need to do anything. + ksMode, err := ctx.VSchema.ForeignKeyMode(vTbl.Keyspace.Name) + if err != nil { + panic(err) + } + if ksMode != vschemapb.Keyspace_managed { + return insOp + } + + parentFKs := ctx.SemTable.GetParentForeignKeysList() + childFks := ctx.SemTable.GetChildForeignKeysList() + if len(parentFKs) > 0 { + panic(vterrors.VT12002()) + } + if len(childFks) > 0 { + if ins.Action == sqlparser.ReplaceAct { + panic(vterrors.VT12001("REPLACE INTO with foreign keys")) + } + if len(ins.OnDup) > 0 { + rows := getRowsOrError(ins) + return createUpsertOperator(ctx, ins, insOp, rows, vTbl) + } + } + return insOp +} + +func getRowsOrError(ins *sqlparser.Insert) sqlparser.Values { + if rows, ok := ins.Rows.(sqlparser.Values); ok { + return rows + } + panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with select statement")) +} + func getWhereCondExpr(compExprs []*sqlparser.ComparisonExpr) sqlparser.Expr { var outputExpr sqlparser.Expr for _, expr := range compExprs { @@ -164,25 +200,7 @@ func pkCompExpression(vTbl *vindexes.Table, ins *sqlparser.Insert, rows sqlparse if len(vTbl.PrimaryKey) == 0 { return nil } - type pComp struct { - idx int - def sqlparser.Expr - } - var pIndexes []pComp - var pColTuple sqlparser.ValTuple - for _, pCol := range vTbl.PrimaryKey { - var def sqlparser.Expr - idx := ins.Columns.FindColumn(pCol) - if idx == -1 { - def = findDefault(vTbl, pCol) - if def == nil { - // If default value is empty, nothing to compare as it will always be false. - return nil - } - } - pIndexes = append(pIndexes, pComp{idx, def}) - pColTuple = append(pColTuple, sqlparser.NewColName(pCol.String())) - } + pIndexes, pColTuple := findPKIndexes(vTbl, ins) var pValTuple sqlparser.ValTuple for _, row := range rows { @@ -199,6 +217,29 @@ func pkCompExpression(vTbl *vindexes.Table, ins *sqlparser.Insert, rows sqlparse return sqlparser.NewComparisonExpr(sqlparser.InOp, pColTuple, pValTuple, nil) } +type pComp struct { + idx int + def sqlparser.Expr + col sqlparser.IdentifierCI +} + +func findPKIndexes(vTbl *vindexes.Table, ins *sqlparser.Insert) (pIndexes []pComp, pColTuple sqlparser.ValTuple) { + for _, pCol := range vTbl.PrimaryKey { + var def sqlparser.Expr + idx := ins.Columns.FindColumn(pCol) + if idx == -1 { + def = findDefault(vTbl, pCol) + if def == nil { + // If default value is empty, nothing to compare as it will always be false. + return nil, nil + } + } + pIndexes = append(pIndexes, pComp{idx, def, pCol}) + pColTuple = append(pColTuple, sqlparser.NewColName(pCol.String())) + } + return +} + func findDefault(vTbl *vindexes.Table, pCol sqlparser.IdentifierCI) sqlparser.Expr { for _, column := range vTbl.Columns { if column.Name.Equal(pCol) { @@ -312,42 +353,7 @@ func createUniqueKeyComp(ins *sqlparser.Insert, expr sqlparser.Expr, vTbl *vinde return offsets, false } -func checkAndCreateInsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) Operator { - insOp := createInsertOperator(ctx, ins, vTbl, routing) - - if ins.Comments != nil { - insOp = &LockAndComment{ - Source: insOp, - Comments: ins.Comments, - } - } - - // Find the foreign key mode and for unmanaged foreign-key-mode, we don't need to do anything. - ksMode, err := ctx.VSchema.ForeignKeyMode(vTbl.Keyspace.Name) - if err != nil { - return nil - } - if ksMode != vschemapb.Keyspace_managed { - return insOp - } - - parentFKs := ctx.SemTable.GetParentForeignKeysList() - childFks := ctx.SemTable.GetChildForeignKeysList() - if len(parentFKs) > 0 { - panic(vterrors.VT12002()) - } - if len(childFks) > 0 { - if ins.Action == sqlparser.ReplaceAct { - panic(vterrors.VT12001("REPLACE INTO with foreign keys")) - } - if len(ins.OnDup) > 0 { - panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys")) - } - } - return insOp -} - -func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) Operator { +func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.Insert, vTbl *vindexes.Table, routing Routing) (op Operator) { if _, target := routing.(*TargetedRouting); target { panic(vterrors.VT09017("INSERT with a target destination is not allowed")) } @@ -382,11 +388,18 @@ func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.I insOp.ColVindexes = getColVindexes(insOp) switch rows := insStmt.Rows.(type) { case sqlparser.Values: + op = route route.Source = insertRowsPlan(ctx, insOp, insStmt, rows) case sqlparser.SelectStatement: - return insertSelectPlan(ctx, insOp, route, insStmt, rows) + op = insertSelectPlan(ctx, insOp, route, insStmt, rows) + } + if insStmt.Comments != nil { + op = &LockAndComment{ + Source: op, + Comments: insStmt.Comments, + } } - return route + return op } func insertSelectPlan( diff --git a/go/vt/vtgate/planbuilder/operators/upsert.go b/go/vt/vtgate/planbuilder/operators/upsert.go new file mode 100644 index 00000000000..8f028a790b2 --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/upsert.go @@ -0,0 +1,143 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +var _ Operator = (*Upsert)(nil) + +// Upsert represents an insert on duplicate key operation on a table. +type Upsert struct { + Sources []UpsertSource + + noColumns + noPredicates +} + +type UpsertSource struct { + Insert Operator + Update Operator +} + +func (u *Upsert) Clone(inputs []Operator) Operator { + up := &Upsert{} + up.setInputs(inputs) + return up +} + +func (u *Upsert) setInputs(inputs []Operator) { + for i := 0; i < len(inputs); i += 2 { + u.Sources = append(u.Sources, UpsertSource{ + Insert: inputs[i], + Update: inputs[i+1], + }) + } +} + +func (u *Upsert) Inputs() []Operator { + var inputs []Operator + for _, source := range u.Sources { + inputs = append(inputs, source.Insert, source.Update) + } + return inputs +} + +func (u *Upsert) SetInputs(inputs []Operator) { + u.Sources = nil + u.setInputs(inputs) +} + +func (u *Upsert) ShortDescription() string { + return "" +} + +func (u *Upsert) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { + return nil +} + +func createUpsertOperator(ctx *plancontext.PlanningContext, ins *sqlparser.Insert, insOp Operator, rows sqlparser.Values, vTbl *vindexes.Table) Operator { + if len(vTbl.UniqueKeys) != 0 { + panic(vterrors.VT12001("ON DUPLICATE KEY UPDATE with foreign keys with unique keys")) + } + + pIndexes, _ := findPKIndexes(vTbl, ins) + if len(pIndexes) == 0 { + // nothing to compare for update. + // Hence, only perform insert. + return insOp + } + + upsert := &Upsert{} + for _, row := range rows { + var comparisons []sqlparser.Expr + for _, pIdx := range pIndexes { + var expr sqlparser.Expr + if pIdx.idx == -1 { + expr = pIdx.def + } else { + expr = row[pIdx.idx] + } + comparisons = append(comparisons, + sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.NewColName(pIdx.col.String()), expr, nil)) + } + whereExpr := sqlparser.AndExpressions(comparisons...) + + var updExprs sqlparser.UpdateExprs + for _, ue := range ins.OnDup { + expr := sqlparser.CopyOnRewrite(ue.Expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) { + vfExpr, ok := cursor.Node().(*sqlparser.ValuesFuncExpr) + if !ok { + return + } + idx := ins.Columns.FindColumn(vfExpr.Name.Name) + if idx == -1 { + panic(vterrors.VT03014(sqlparser.String(vfExpr.Name), "field list")) + } + cursor.Replace(row[idx]) + }, nil).(sqlparser.Expr) + updExprs = append(updExprs, &sqlparser.UpdateExpr{ + Name: ue.Name, + Expr: expr, + }) + } + + upd := &sqlparser.Update{ + Comments: ins.Comments, + TableExprs: sqlparser.TableExprs{ins.Table}, + Exprs: updExprs, + Where: sqlparser.NewWhere(sqlparser.WhereClause, whereExpr), + } + updOp := createOpFromStmt(ctx, upd, false, "") + + // replan insert statement without on duplicate key update. + newInsert := sqlparser.CloneRefOfInsert(ins) + newInsert.OnDup = nil + newInsert.Rows = sqlparser.Values{row} + insOp = createOpFromStmt(ctx, newInsert, false, "") + upsert.Sources = append(upsert.Sources, UpsertSource{ + Insert: insOp, + Update: updOp, + }) + } + + return upsert +} diff --git a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json index deba02e2009..3ab6ef96118 100644 --- a/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json @@ -1658,9 +1658,168 @@ } }, { - "comment": "Insert with on duplicate key update - foreign keys disallowed", + "comment": "Insert with on duplicate key update - foreign key with new value", "query": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", - "plan": "VT12001: unsupported: ON DUPLICATE KEY UPDATE with foreign keys" + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + }, + { + "InputName": "Update-1", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col1 from u_tbl1 where 1 != 1", + "Query": "select col1 from u_tbl1 where id = 1 for update nowait", + "Table": "u_tbl1" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "FkCascade", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where (col2) in ::fkc_vals for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (col3) not in ((cast(5 as CHAR)))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=OFF) */ u_tbl2 set col2 = 5 where (col2) in ::fkc_vals", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "CascadeChild-2", + "OperatorType": "FkCascade", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col9 from u_tbl9 where 1 != 1", + "Query": "select col9 from u_tbl9 where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR))) for update nowait", + "Table": "u_tbl9" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals3", + "Cols": [ + 0 + ], + "Query": "update u_tbl8 set col8 = null where (col8) in ::fkc_vals3", + "Table": "u_tbl8" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl9 set col9 = null where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR)))", + "Table": "u_tbl9" + } + ] + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl1 set col1 = 5 where id = 1", + "Table": "u_tbl1" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl1", + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3", + "unsharded_fk_allow.u_tbl8", + "unsharded_fk_allow.u_tbl9" + ] + } }, { "comment": "Insert with on duplicate key update - foreign keys not on update column - allowed", @@ -2511,5 +2670,364 @@ "sharded_fk_allow.tbl3" ] } + }, + { + "comment": "Insert with on duplicate key update - foreign key with values function", + "query": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = values(col1)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = values(col1)", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + }, + { + "InputName": "Update-1", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col1 from u_tbl1 where 1 != 1", + "Query": "select col1 from u_tbl1 where id = 1 for update nowait", + "Table": "u_tbl1" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "FkCascade", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where (col2) in ::fkc_vals for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (col3) not in ((cast(3 as CHAR)))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=OFF) */ u_tbl2 set col2 = 3 where (col2) in ::fkc_vals", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "CascadeChild-2", + "OperatorType": "FkCascade", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col9 from u_tbl9 where 1 != 1", + "Query": "select col9 from u_tbl9 where (col9) in ::fkc_vals2 and (col9) not in ((cast(3 as CHAR))) for update nowait", + "Table": "u_tbl9" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals3", + "Cols": [ + 0 + ], + "Query": "update u_tbl8 set col8 = null where (col8) in ::fkc_vals3", + "Table": "u_tbl8" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl9 set col9 = null where (col9) in ::fkc_vals2 and (col9) not in ((cast(3 as CHAR)))", + "Table": "u_tbl9" + } + ] + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl1 set col1 = 3 where id = 1", + "Table": "u_tbl1" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl1", + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3", + "unsharded_fk_allow.u_tbl8", + "unsharded_fk_allow.u_tbl9" + ] + } + }, + { + "comment": "insert with on duplicate key update with multiple rows", + "query": "insert into u_tbl2 (id, col2) values (:v1, :v2),(:v3, :v4), (:v5, :v6) on duplicate key update col2 = values(col2)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl2 (id, col2) values (:v1, :v2),(:v3, :v4), (:v5, :v6) on duplicate key update col2 = values(col2)", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl2(id, col2) values (:v1, :v2)", + "TableName": "u_tbl2" + }, + { + "InputName": "Update-1", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where id = :v1 for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals and (cast(:v2 as CHAR) is null or (col3) not in ((cast(:v2 as CHAR))))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl2 set col2 = :v2 where id = :v1", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "Insert-2", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl2(id, col2) values (:v3, :v4)", + "TableName": "u_tbl2" + }, + { + "InputName": "Update-2", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where id = :v3 for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (cast(:v4 as CHAR) is null or (col3) not in ((cast(:v4 as CHAR))))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl2 set col2 = :v4 where id = :v3", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "Insert-3", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert into u_tbl2(id, col2) values (:v5, :v6)", + "TableName": "u_tbl2" + }, + { + "InputName": "Update-3", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where id = :v5 for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Query": "update u_tbl3 set col3 = null where (col3) in ::fkc_vals2 and (cast(:v6 as CHAR) is null or (col3) not in ((cast(:v6 as CHAR))))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update u_tbl2 set col2 = :v6 where id = :v5", + "Table": "u_tbl2" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json b/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json index a61a035a8d8..e382a83a0ad 100644 --- a/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/foreignkey_checks_on_cases.json @@ -1660,7 +1660,166 @@ { "comment": "Insert with on duplicate key update - foreign keys disallowed", "query": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", - "plan": "VT12001: unsupported: ON DUPLICATE KEY UPDATE with foreign keys" + "plan": { + "QueryType": "INSERT", + "Original": "insert into u_tbl1 (id, col1) values (1, 3) on duplicate key update col1 = 5", + "Instructions": { + "OperatorType": "Upsert", + "TargetTabletType": "PRIMARY", + "Inputs": [ + { + "InputName": "Insert-1", + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "NoAutoCommit": true, + "Query": "insert /*+ SET_VAR(foreign_key_checks=On) */ into u_tbl1(id, col1) values (1, 3)", + "TableName": "u_tbl1" + }, + { + "InputName": "Update-1", + "OperatorType": "FkCascade", + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col1 from u_tbl1 where 1 != 1", + "Query": "select col1 from u_tbl1 where id = 1 for update nowait", + "Table": "u_tbl1" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "FkCascade", + "BvName": "fkc_vals", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col2 from u_tbl2 where 1 != 1", + "Query": "select col2 from u_tbl2 where (col2) in ::fkc_vals for update nowait", + "Table": "u_tbl2" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals1", + "Cols": [ + 0 + ], + "Query": "update /*+ SET_VAR(foreign_key_checks=ON) */ u_tbl3 set col3 = null where (col3) in ::fkc_vals1 and (col3) not in ((cast(5 as CHAR)))", + "Table": "u_tbl3" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=OFF) */ u_tbl2 set col2 = 5 where (col2) in ::fkc_vals", + "Table": "u_tbl2" + } + ] + }, + { + "InputName": "CascadeChild-2", + "OperatorType": "FkCascade", + "BvName": "fkc_vals2", + "Cols": [ + 0 + ], + "Inputs": [ + { + "InputName": "Selection", + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "FieldQuery": "select col9 from u_tbl9 where 1 != 1", + "Query": "select col9 from u_tbl9 where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR))) for update nowait", + "Table": "u_tbl9" + }, + { + "InputName": "CascadeChild-1", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "BvName": "fkc_vals3", + "Cols": [ + 0 + ], + "Query": "update /*+ SET_VAR(foreign_key_checks=ON) */ u_tbl8 set col8 = null where (col8) in ::fkc_vals3", + "Table": "u_tbl8" + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=ON) */ u_tbl9 set col9 = null where (col9) in ::fkc_vals2 and (col9) not in ((cast(5 as CHAR)))", + "Table": "u_tbl9" + } + ] + }, + { + "InputName": "Parent", + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_fk_allow", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update /*+ SET_VAR(foreign_key_checks=On) */ u_tbl1 set col1 = 5 where id = 1", + "Table": "u_tbl1" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_fk_allow.u_tbl1", + "unsharded_fk_allow.u_tbl2", + "unsharded_fk_allow.u_tbl3", + "unsharded_fk_allow.u_tbl8", + "unsharded_fk_allow.u_tbl9" + ] + } }, { "comment": "Insert with on duplicate key update - foreign keys not on update column - allowed", diff --git a/go/vt/vtgate/planbuilder/upsert.go b/go/vt/vtgate/planbuilder/upsert.go new file mode 100644 index 00000000000..cd9c127635c --- /dev/null +++ b/go/vt/vtgate/planbuilder/upsert.go @@ -0,0 +1,37 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planbuilder + +import ( + "vitess.io/vitess/go/vt/vtgate/engine" +) + +type upsert struct { + insert []logicalPlan + update []logicalPlan +} + +var _ logicalPlan = (*upsert)(nil) + +// Primitive implements the logicalPlan interface +func (u *upsert) Primitive() engine.Primitive { + up := &engine.Upsert{} + for i := 0; i < len(u.insert); i++ { + up.AddUpsert(u.insert[i].Primitive(), u.update[i].Primitive()) + } + return up +}