Skip to content

Commit

Permalink
Update with Limit Plan (#15107)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
Signed-off-by: Andres Taylor <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
  • Loading branch information
harshit-gangal and systay authored Feb 12, 2024
1 parent aeb008c commit 14473b9
Show file tree
Hide file tree
Showing 25 changed files with 890 additions and 458 deletions.
64 changes: 64 additions & 0 deletions go/test/endtoend/vtgate/queries/dml/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,67 @@ func TestDeleteWithLimit(t *testing.T) {
require.EqualValues(t, 0, qr.RowsAffected)

}

// TestUpdateWithLimit executed update queries with limit
func TestUpdateWithLimit(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 20, "vtgate")

mcmp, closer := start(t)
defer closer()

// initial rows
mcmp.Exec("insert into s_tbl(id, num) values (1,10), (2,10), (3,10), (4,20), (5,5), (6,15), (7,17), (8,80)")
mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)")

// check rows
mcmp.AssertMatches(`select id, num from s_tbl order by id`,
`[[INT64(1) INT64(10)] [INT64(2) INT64(10)] [INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(5) INT64(5)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)

// update with limit
qr := mcmp.Exec(`update s_tbl set num = 12 order by num, id limit 3`)
require.EqualValues(t, 3, qr.RowsAffected)

qr = mcmp.Exec(`update order_tbl set cust_no = 12 where region_id = 1 limit 1`)
require.EqualValues(t, 1, qr.RowsAffected)

// check rows
mcmp.AssertMatches(`select id, num from s_tbl order by id`,
`[[INT64(1) INT64(12)] [INT64(2) INT64(12)] [INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(5) INT64(12)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
// 2 rows matches but limit is 1, so any one of the row can be modified in the table.
mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(12)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(12)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)

// update with limit
qr = mcmp.Exec(`update s_tbl set num = 32 where num > 17 limit 1`)
require.EqualValues(t, 1, qr.RowsAffected)

qr = mcmp.Exec(`update order_tbl set cust_no = cust_no + 10 limit 5`)
require.EqualValues(t, 4, qr.RowsAffected)

// check rows
// 2 rows matches `num > 17` but limit is 1 so any one of them will be updated.
mcmp.AssertMatchesAnyNoCompare(`select id, num from s_tbl order by id`,
`[[INT64(1) INT64(12)] [INT64(2) INT64(12)] [INT64(3) INT64(10)] [INT64(4) INT64(32)] [INT64(5) INT64(12)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`,
`[[INT64(1) INT64(12)] [INT64(2) INT64(12)] [INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(5) INT64(12)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(32)]]`)
mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(22)] [INT64(1) INT64(2) INT64(12)] [INT64(2) INT64(3) INT64(15)] [INT64(2) INT64(4) INT64(65)]]`,
`[[INT64(1) INT64(1) INT64(14)] [INT64(1) INT64(2) INT64(22)] [INT64(2) INT64(3) INT64(15)] [INT64(2) INT64(4) INT64(65)]]`)

// trying with zero limit.
qr = mcmp.Exec(`update s_tbl set num = 44 limit 0`)
require.EqualValues(t, 0, qr.RowsAffected)

qr = mcmp.Exec(`update order_tbl set oid = 44 limit 0`)
require.EqualValues(t, 0, qr.RowsAffected)

// trying with limit with no-matching row.
qr = mcmp.Exec(`update s_tbl set num = 44 where id > 100 limit 2`)
require.EqualValues(t, 0, qr.RowsAffected)

qr = mcmp.Exec(`update order_tbl set oid = 44 where region_id > 100 limit 2`)
require.EqualValues(t, 0, qr.RowsAffected)

}
2 changes: 0 additions & 2 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ var (
VT09021 = errorWithState("VT09021", vtrpcpb.Code_FAILED_PRECONDITION, KeyDoesNotExist, "Vindex '%s' does not exist in table '%s'", "Vindex hints have to reference an existing vindex, and no such vindex could be found for the given table.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "'replace into' with foreign key constraints are not allowed", "Foreign key constraints sometimes are not written in binary logs and will cause issue with vreplication workflows like online-ddl.")

VT12001 = errorWithoutState("VT12001", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: %s", "This statement is unsupported by Vitess. Please rewrite your query to use supported syntax.")
VT12002 = errorWithoutState("VT12002", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard foreign keys", "Vitess does not support cross shard foreign keys.")
Expand Down Expand Up @@ -171,7 +170,6 @@ var (
VT09018,
VT09019,
VT10001,
VT10002,
VT12001,
VT12002,
VT13001,
Expand Down
30 changes: 15 additions & 15 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,39 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*DeleteWithInput)(nil)
var _ Primitive = (*DMLWithInput)(nil)

const DmVals = "dm_vals"
const DmlVals = "dml_vals"

// DeleteWithInput represents the instructions to perform a delete operation based on the input result.
type DeleteWithInput struct {
// DMLWithInput represents the instructions to perform a DML operation based on the input result.
type DMLWithInput struct {
txNeeded

Delete Primitive
Input Primitive
DML Primitive
Input Primitive

OutputCols []int
}

func (del *DeleteWithInput) RouteType() string {
return "DeleteWithInput"
func (dml *DMLWithInput) RouteType() string {
return "DMLWithInput"
}

func (del *DeleteWithInput) GetKeyspaceName() string {
return del.Input.GetKeyspaceName()
func (dml *DMLWithInput) GetKeyspaceName() string {
return dml.Input.GetKeyspaceName()
}

func (del *DeleteWithInput) GetTableName() string {
return del.Input.GetTableName()
func (dml *DMLWithInput) GetTableName() string {
return dml.Input.GetTableName()
}

func (del *DeleteWithInput) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{del.Input, del.Delete}, nil
func (dml *DMLWithInput) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{dml.Input, dml.DML}, nil
}

// TryExecute performs a non-streaming exec.
func (del *DeleteWithInput) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
inputRes, err := vcursor.ExecutePrimitive(ctx, del.Input, bindVars, false)
func (dml *DMLWithInput) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
inputRes, err := vcursor.ExecutePrimitive(ctx, dml.Input, bindVars, false)
if err != nil {
return nil, err
}
Expand All @@ -67,14 +67,14 @@ func (del *DeleteWithInput) TryExecute(ctx context.Context, vcursor VCursor, bin
}

var bv *querypb.BindVariable
if len(del.OutputCols) == 1 {
bv = getBVSingle(inputRes, del.OutputCols[0])
if len(dml.OutputCols) == 1 {
bv = getBVSingle(inputRes, dml.OutputCols[0])
} else {
bv = getBVMulti(inputRes, del.OutputCols)
bv = getBVMulti(inputRes, dml.OutputCols)
}

bindVars[DmVals] = bv
return vcursor.ExecutePrimitive(ctx, del.Delete, bindVars, false)
bindVars[DmlVals] = bv
return vcursor.ExecutePrimitive(ctx, dml.DML, bindVars, false)
}

func getBVSingle(res *sqltypes.Result, offset int) *querypb.BindVariable {
Expand All @@ -99,25 +99,25 @@ func getBVMulti(res *sqltypes.Result, offsets []int) *querypb.BindVariable {
}

// TryStreamExecute performs a streaming exec.
func (del *DeleteWithInput) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := del.TryExecute(ctx, vcursor, bindVars, wantfields)
func (dml *DMLWithInput) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := dml.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return err
}
return callback(res)
}

// GetFields fetches the field info.
func (del *DeleteWithInput) GetFields(context.Context, VCursor, map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.VT13001("unreachable code for MULTI DELETE")
func (dml *DMLWithInput) GetFields(context.Context, VCursor, map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.VT13001("unreachable code for DMLs")
}

func (del *DeleteWithInput) description() PrimitiveDescription {
func (dml *DMLWithInput) description() PrimitiveDescription {
other := map[string]any{
"Offset": del.OutputCols,
"Offset": dml.OutputCols,
}
return PrimitiveDescription{
OperatorType: "DeleteWithInput",
OperatorType: "DMLWithInput",
TargetTabletType: topodatapb.TabletType_PRIMARY,
Other: other,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func TestDeleteWithInputSingleOffset(t *testing.T) {
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "int64"), "1", "2", "3"),
}}

del := &DeleteWithInput{
del := &DMLWithInput{
Input: input,
Delete: &Delete{
DML: &Delete{
DML: &DML{
RoutingParameters: &RoutingParameters{
Opcode: Scatter,
Expand All @@ -55,8 +55,8 @@ func TestDeleteWithInputSingleOffset(t *testing.T) {
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ` +
`ks.-20: dummy_delete {dm_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} ` +
`ks.20-: dummy_delete {dm_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true false`,
`ks.-20: dummy_delete {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} ` +
`ks.20-: dummy_delete {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true false`,
})

vc.Rewind()
Expand All @@ -66,8 +66,8 @@ func TestDeleteWithInputSingleOffset(t *testing.T) {
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ` +
`ks.-20: dummy_delete {dm_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} ` +
`ks.20-: dummy_delete {dm_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true false`,
`ks.-20: dummy_delete {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} ` +
`ks.20-: dummy_delete {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true false`,
})
}

Expand All @@ -76,9 +76,9 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int64|varchar"), "1|a", "2|b", "3|c"),
}}

del := &DeleteWithInput{
del := &DMLWithInput{
Input: input,
Delete: &Delete{
DML: &Delete{
DML: &DML{
RoutingParameters: &RoutingParameters{
Opcode: Scatter,
Expand All @@ -99,8 +99,8 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ` +
`ks.-20: dummy_delete {dm_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} ` +
`ks.20-: dummy_delete {dm_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} true false`,
`ks.-20: dummy_delete {dml_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} ` +
`ks.20-: dummy_delete {dml_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} true false`,
})

vc.Rewind()
Expand All @@ -110,7 +110,7 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ` +
`ks.-20: dummy_delete {dm_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} ` +
`ks.20-: dummy_delete {dm_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} true false`,
`ks.-20: dummy_delete {dml_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} ` +
`ks.20-: dummy_delete {dml_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} true false`,
})
}
12 changes: 6 additions & 6 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3041,14 +3041,14 @@ func TestDeleteMultiTable(t *testing.T) {
wantQueries := []*querypb.BoundQuery{
{Sql: "select `user`.id, `user`.col from `user`", BindVariables: map[string]*querypb.BindVariable{}},
bq, bq, bq, bq, bq, bq, bq, bq,
{Sql: "select `user`.Id, `user`.`name` from `user` where `user`.id in ::dm_vals for update", BindVariables: map[string]*querypb.BindVariable{"dm_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}},
{Sql: "delete from `user` where `user`.id in ::dm_vals", BindVariables: map[string]*querypb.BindVariable{"dm_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}}}
{Sql: "select `user`.Id, `user`.`name` from `user` where `user`.id in ::dml_vals for update", BindVariables: map[string]*querypb.BindVariable{"dml_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}},
{Sql: "delete from `user` where `user`.id in ::dml_vals", BindVariables: map[string]*querypb.BindVariable{"dml_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}}}
assertQueries(t, sbc1, wantQueries)

wantQueries = []*querypb.BoundQuery{
{Sql: "select `user`.id, `user`.col from `user`", BindVariables: map[string]*querypb.BindVariable{}},
{Sql: "select `user`.Id, `user`.`name` from `user` where `user`.id in ::dm_vals for update", BindVariables: map[string]*querypb.BindVariable{"dm_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}},
{Sql: "delete from `user` where `user`.id in ::dm_vals", BindVariables: map[string]*querypb.BindVariable{"dm_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}},
{Sql: "select `user`.Id, `user`.`name` from `user` where `user`.id in ::dml_vals for update", BindVariables: map[string]*querypb.BindVariable{"dml_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}},
{Sql: "delete from `user` where `user`.id in ::dml_vals", BindVariables: map[string]*querypb.BindVariable{"dml_vals": {Type: querypb.Type_TUPLE, Values: dmlVals}}},
}
assertQueries(t, sbc2, wantQueries)

Expand All @@ -3067,7 +3067,7 @@ func TestDeleteMultiTable(t *testing.T) {
testQueryLog(t, executor, logChan, "VindexDelete", "DELETE", "delete from name_user_map where `name` = :name and user_id = :user_id", 1)
// select `user`.id, `user`.col from `user` - 8 shard
// select 1 from music where music.user_id = 1 and music.col = :user_col - 8 shards
// select Id, `name` from `user` where (`user`.id) in ::dm_vals for update - 1 shard
// delete from `user` where (`user`.id) in ::dm_vals - 1 shard
// select Id, `name` from `user` where (`user`.id) in ::dml_vals for update - 1 shard
// delete from `user` where (`user`.id) in ::dml_vals - 1 shard
testQueryLog(t, executor, logChan, "TestExecute", "DELETE", "delete `user` from `user` join music on `user`.col = music.col where music.user_id = 1", 18)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ import (
"vitess.io/vitess/go/vt/vtgate/engine"
)

type deleteWithInput struct {
input logicalPlan
delete logicalPlan
type dmlWithInput struct {
input logicalPlan
dml logicalPlan

outputCols []int
}

var _ logicalPlan = (*deleteWithInput)(nil)
var _ logicalPlan = (*dmlWithInput)(nil)

// Primitive implements the logicalPlan interface
func (d *deleteWithInput) Primitive() engine.Primitive {
func (d *dmlWithInput) Primitive() engine.Primitive {
inp := d.input.Primitive()
del := d.delete.Primitive()
return &engine.DeleteWithInput{
Delete: del,
del := d.dml.Primitive()
return &engine.DMLWithInput{
DML: del,
Input: inp,
OutputCols: d.outputCols,
}
Expand Down
Loading

0 comments on commit 14473b9

Please sign in to comment.