Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support cross shard DELETE with LIMIT/ORDER BY #14959

Merged
merged 8 commits into from
Jan 24, 2024
Prev Previous commit
Next Next commit
fix: delete with input to have offset to select only required columns…
… for sending as bindvars, added e2e tests

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Jan 24, 2024
commit 28a8d5886a9391f60e98055a9ee2974fa5c88de2
50 changes: 50 additions & 0 deletions go/test/endtoend/vtgate/queries/dml/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/utils"
)
Expand Down Expand Up @@ -78,3 +79,52 @@ func TestMultiTableDelete(t *testing.T) {
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[[INT64(1) VARCHAR("a")] [INT64(2) VARCHAR("b")] [INT64(3) VARCHAR("a")] [INT64(4) VARCHAR("c")]]`)
}

// TestDeleteWithLimit executed delete queries with limit
func TestDeleteWithLimit(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 19, "vtgate")

harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
mcmp, closer := start(t)
defer closer()

// initial rows
mcmp.Exec("insert into s_tbl(id, num) values (1,10), (2,10), (3,10), (4,20), (5,5), (6,15), (7,17), (8,80)")
mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)")

// check rows
mcmp.AssertMatches(`select id, num from s_tbl order by id`,
`[[INT64(1) INT64(10)] [INT64(2) INT64(10)] [INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(5) INT64(5)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)

// delete with limit
qr := mcmp.Exec(`delete from s_tbl order by num, id limit 3`)
require.EqualValues(t, 3, qr.RowsAffected)

qr = mcmp.Exec(`delete from order_tbl where region_id = 1 limit 1`)
require.EqualValues(t, 1, qr.RowsAffected)

// check rows
mcmp.AssertMatches(`select id, num from s_tbl order by id`,
`[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`)
// 2 rows matches but limit is 1, so any one of the row can remain in table.
mcmp.AssertMatchesAnyNoCompare(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)

// delete with limit
qr = mcmp.Exec(`delete from s_tbl where num < 20 limit 2`)
require.EqualValues(t, 2, qr.RowsAffected)

qr = mcmp.Exec(`delete from order_tbl limit 5`)
require.EqualValues(t, 3, qr.RowsAffected)

// check rows
// 3 rows matches `num < 20` but limit is 2 so any one of them can remain in the table.
mcmp.AssertMatchesAnyNoCompare(`select id, num from s_tbl order by id`,
`[[INT64(4) INT64(20)] [INT64(7) INT64(17)] [INT64(8) INT64(80)]]`,
`[[INT64(3) INT64(10)] [INT64(4) INT64(20)] [INT64(8) INT64(80)]]`,
`[[INT64(4) INT64(20)] [INT64(6) INT64(15)] [INT64(8) INT64(80)]]`)
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[]`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type DeleteWithInput struct {
Delete Primitive
Input Primitive

OutputCols []int

txNeeded
}

Expand Down Expand Up @@ -64,12 +66,16 @@ func (del *DeleteWithInput) TryExecute(ctx context.Context, vcursor VCursor, bin
bv := &querypb.BindVariable{
Type: querypb.Type_TUPLE,
}
outputVals := make([]sqltypes.Value, 0, len(del.OutputCols))
for _, row := range inputRes.Rows {
bv.Values = append(bv.Values, sqltypes.TupleToProto(row))
for _, offset := range del.OutputCols {
outputVals = append(outputVals, row[offset])
}
bv.Values = append(bv.Values, sqltypes.TupleToProto(outputVals))
outputVals = outputVals[:0]
}
return vcursor.ExecutePrimitive(ctx, del.Delete, map[string]*querypb.BindVariable{
DM_VALS: bv,
}, false)
bindVars[DM_VALS] = bv
return vcursor.ExecutePrimitive(ctx, del.Delete, bindVars, false)
}

// TryStreamExecute performs a streaming exec.
Expand All @@ -87,8 +93,12 @@ func (del *DeleteWithInput) GetFields(context.Context, VCursor, map[string]*quer
}

func (del *DeleteWithInput) description() PrimitiveDescription {
other := map[string]any{
"Offset": del.OutputCols,
}
return PrimitiveDescription{
OperatorType: "DeleteWithInput",
TargetTabletType: topodatapb.TabletType_PRIMARY,
Other: other,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

func TestDeleteMulti(t *testing.T) {
func TestDeleteWithInput(t *testing.T) {
input := &fakePrimitive{results: []*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "int64"), "1", "2", "3"),
}}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3018,11 +3018,11 @@ func TestInsertReference(t *testing.T) {
require.NoError(t, err) // Gen4 planner can redirect the query to correct source for update when reference table is involved.
}

func TestDeleteMulti(t *testing.T) {
func TestDeleteMultiTable(t *testing.T) {
executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t)
executor.vschema.Keyspaces["TestExecutor"].Tables["user"].PrimaryKey = sqlparser.Columns{sqlparser.NewIdentifierCI("id")}

logChan := executor.queryLogger.Subscribe("TestDeleteMulti")
logChan := executor.queryLogger.Subscribe("TestDeleteMultiTable")
defer executor.queryLogger.Unsubscribe(logChan)

session := &vtgatepb.Session{TargetString: "@primary"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ import (
"vitess.io/vitess/go/vt/vtgate/engine"
)

type deleteMulti struct {
type deleteWithInput struct {
input logicalPlan
delete logicalPlan

outputCols []int
}

var _ logicalPlan = (*deleteMulti)(nil)
var _ logicalPlan = (*deleteWithInput)(nil)

// Primitive implements the logicalPlan interface
func (d *deleteMulti) Primitive() engine.Primitive {
func (d *deleteWithInput) Primitive() engine.Primitive {
inp := d.input.Primitive()
del := d.delete.Primitive()
return &engine.DeleteWithInput{
Delete: del,
Input: inp,
Delete: del,
Input: inp,
OutputCols: d.outputCols,
}
}
11 changes: 6 additions & 5 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera
case *operators.Sequential:
return transformSequential(ctx, op)
case *operators.DeleteWithInput:
return transformDeleteMulti(ctx, op)
return transformDeleteWithInput(ctx, op)
}

return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op))
}

func transformDeleteMulti(ctx *plancontext.PlanningContext, op *operators.DeleteWithInput) (logicalPlan, error) {
func transformDeleteWithInput(ctx *plancontext.PlanningContext, op *operators.DeleteWithInput) (logicalPlan, error) {
input, err := transformToLogicalPlan(ctx, op.Source)
if err != nil {
return nil, err
Expand All @@ -91,9 +91,10 @@ func transformDeleteMulti(ctx *plancontext.PlanningContext, op *operators.Delete
if err != nil {
return nil, err
}
return &deleteMulti{
input: input,
delete: del,
return &deleteWithInput{
input: input,
delete: del,
outputCols: op.Offsets,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ limitations under the License.

package operators

import "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
import (
"fmt"

"vitess.io/vitess/go/slice"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

type DeleteWithInput struct {
Source Operator
Delete Operator

cols []*sqlparser.ColName
Offsets []int

noColumns
noPredicates
}
Expand All @@ -45,11 +54,30 @@ func (d *DeleteWithInput) SetInputs(inputs []Operator) {
}

func (d *DeleteWithInput) ShortDescription() string {
return ""
colStrings := slice.Map(d.cols, func(from *sqlparser.ColName) string {
return sqlparser.String(from)
})
out := ""
for idx, colString := range colStrings {
out += colString
if len(d.Offsets) > idx {
out += fmt.Sprintf(":%d", d.Offsets[idx])
}
out += " "
}
return out
}

func (d *DeleteWithInput) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
return nil
}

func (d *DeleteWithInput) planOffsets(ctx *plancontext.PlanningContext) Operator {
for _, col := range d.cols {
offset := d.Source.AddColumn(ctx, true, false, aeWrap(col))
d.Offsets = append(d.Offsets, offset)
}
return d
}

var _ Operator = (*DeleteWithInput)(nil)
3 changes: 2 additions & 1 deletion go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func createDeleteWithInput(ctx *plancontext.PlanningContext, in *Delete, src Ope
proj := newAliasedProjection(src)
for _, col := range in.Target.VTable.PrimaryKey {
colName := sqlparser.NewColNameWithQualifier(col.String(), in.Target.Name)
proj.AddColumn(ctx, true, false, sqlparser.NewAliasedExpr(colName, ""))
proj.AddColumn(ctx, true, false, aeWrap(colName))
dm.cols = append(dm.cols, colName)
leftComp = append(leftComp, colName)
ctx.SemTable.Recursive[colName] = in.Target.ID
}
Expand Down
74 changes: 74 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/dml_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -5057,6 +5057,9 @@
"Instructions": {
"OperatorType": "DeleteWithInput",
"TargetTabletType": "PRIMARY",
"Offset": [
0
],
"Inputs": [
{
"OperatorType": "Join",
Expand Down Expand Up @@ -5130,6 +5133,9 @@
"Instructions": {
"OperatorType": "DeleteWithInput",
"TargetTabletType": "PRIMARY",
"Offset": [
0
],
"Inputs": [
{
"OperatorType": "Join",
Expand Down Expand Up @@ -5199,6 +5205,9 @@
"Instructions": {
"OperatorType": "DeleteWithInput",
"TargetTabletType": "PRIMARY",
"Offset": [
0
],
"Inputs": [
{
"OperatorType": "Join",
Expand Down Expand Up @@ -5294,6 +5303,9 @@
"Instructions": {
"OperatorType": "DeleteWithInput",
"TargetTabletType": "PRIMARY",
"Offset": [
0
],
"Inputs": [
{
"OperatorType": "Join",
Expand Down Expand Up @@ -5364,6 +5376,9 @@
"Instructions": {
"OperatorType": "DeleteWithInput",
"TargetTabletType": "PRIMARY",
"Offset": [
0
],
"Inputs": [
{
"OperatorType": "Join",
Expand Down Expand Up @@ -5434,6 +5449,9 @@
"Instructions": {
"OperatorType": "DeleteWithInput",
"TargetTabletType": "PRIMARY",
"Offset": [
0
],
"Inputs": [
{
"OperatorType": "Limit",
Expand Down Expand Up @@ -5476,5 +5494,61 @@
"user.user"
]
}
},
{
"comment": "sharded delete with order by and limit clause",
"query": "delete from user order by name, col limit 5",
"plan": {
"QueryType": "DELETE",
"Original": "delete from user order by name, col limit 5",
"Instructions": {
"OperatorType": "DeleteWithInput",
"TargetTabletType": "PRIMARY",
"Offset": [
0
],
"Inputs": [
{
"OperatorType": "Limit",
"Count": "5",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.id, `name`, weight_string(`name`), col from `user` where 1 != 1",
"OrderBy": "(1|2) ASC, 3 ASC",
"Query": "select `user`.id, `name`, weight_string(`name`), col from `user` order by `name` asc, col asc limit :__upper_limit",
"Table": "`user`"
}
]
},
{
"OperatorType": "Delete",
"Variant": "MultiEqual",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TargetTabletType": "PRIMARY",
"KsidLength": 1,
"KsidVindex": "user_index",
"OwnedVindexQuery": "select Id, `Name`, Costly from `user` where (`user`.id) in ::dm_vals order by `name` asc, col asc limit 5 for update",
"Query": "delete from `user` where (`user`.id) in ::dm_vals",
"Table": "user",
"Values": [
"dm_vals:0"
],
"Vindex": "user_index"
}
]
},
"TablesUsed": [
"user.user"
]
}
}
]