Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flush tables with read lock to run only with reserved connection #14720

Merged
merged 6 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Send struct {
// MultishardAutocommit specifies that a multishard transaction query can autocommit
MultishardAutocommit bool

ReservedConnectionNeeded bool

noInputs
}

Expand Down Expand Up @@ -88,19 +90,12 @@ func (s *Send) GetTableName() string {
func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, 0)
defer cancelFunc()
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return nil, err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

queries := make([]*querypb.BoundQuery, len(rss))
for i, rs := range rss {
bv := bindVars
Expand All @@ -123,6 +118,26 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str
return result, nil
}

func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*srvtopo.ResolvedShard, error) {
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
if err != nil {
return nil, err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

if s.ReservedConnectionNeeded {
vcursor.Session().NeedsReservedConn()
}
return rss, nil
}

func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool {
if s.IsDML {
return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval()
Expand All @@ -140,19 +155,11 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV

// TryStreamExecute implements Primitive interface
func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

multiBindVars := make([]map[string]*querypb.BindVariable, len(rss))
for i, rs := range rss {
bv := bindVars
Expand All @@ -178,20 +185,13 @@ func (s *Send) GetFields(ctx context.Context, vcursor VCursor, bindVars map[stri

func (s *Send) description() PrimitiveDescription {
other := map[string]any{
"Query": s.Query,
"Table": s.GetTableName(),
}
if s.IsDML {
other["IsDML"] = true
}
if s.SingleShardOnly {
other["SingleShardOnly"] = true
}
if s.ShardNameNeeded {
other["ShardNameNeeded"] = true
}
if s.MultishardAutocommit {
other["MultishardAutocommit"] = true
"Query": s.Query,
"Table": s.GetTableName(),
"IsDML": s.IsDML,
"SingleShardOnly": s.SingleShardOnly,
"ShardNameNeeded": s.ShardNameNeeded,
"MultishardAutocommit": s.MultishardAutocommit,
"ReservedConnectionNeeded": s.ReservedConnectionNeeded,
}
return PrimitiveDescription{
OperatorType: "Send",
Expand Down
30 changes: 13 additions & 17 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,12 @@ func buildFlushOptions(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*pla
dest = key.DestinationAllShards{}
}

tc := &tableCollector{}
for _, tbl := range stmt.TableNames {
tc.addASTTable(keyspace.Name, tbl)
}

return newPlanResult(&engine.Send{
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
IsDML: false,
SingleShardOnly: false,
}, tc.getTables()...), nil
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
ReservedConnectionNeeded: stmt.WithLock,
}), nil
}

func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) {
Expand Down Expand Up @@ -441,9 +435,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
if len(tablesMap) == 1 {
for sendDest, tables := range tablesMap {
return newPlanResult(&engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
ReservedConnectionNeeded: stmt.WithLock,
}, tc.getTables()...), nil
}
}
Expand All @@ -455,9 +450,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
var sources []engine.Primitive
for _, sendDest := range keys {
plan := &engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])),
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])),
ReservedConnectionNeeded: stmt.WithLock,
}
sources = append(sources, plan)
}
Expand Down
40 changes: 39 additions & 1 deletion go/vt/vtgate/planbuilder/testdata/flush_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush local tables with read lock"
"Query": "flush local tables with read lock",
"ReservedConnectionNeeded": true
}
}
},
Expand All @@ -53,5 +54,42 @@
"Query": "flush local hosts, logs"
}
}
},
{
"comment": "Flush statement with multiple tables in different keyspace with read lock",
"query": "flush tables user.music, main.unsharded with read lock",
"plan": {
"QueryType": "FLUSH",
"Original": "flush tables user.music, main.unsharded with read lock",
"Instructions": {
"OperatorType": "Concatenate",
"Inputs": [
{
"OperatorType": "Send",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush tables unsharded with read lock",
"ReservedConnectionNeeded": true
},
{
"OperatorType": "Send",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TargetDestination": "AllShards()",
"Query": "flush tables music with read lock",
"ReservedConnectionNeeded": true
}
]
},
"TablesUsed": [
"main.unsharded",
"user.music"
]
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush local tables unsharded_a with read lock"
"Query": "flush local tables unsharded_a with read lock",
"ReservedConnectionNeeded": true
},
{
"OperatorType": "Send",
Expand All @@ -24,7 +25,8 @@
"Sharded": true
},
"TargetDestination": "AllShards()",
"Query": "flush local tables `user`, user_extra with read lock"
"Query": "flush local tables `user`, user_extra with read lock",
"ReservedConnectionNeeded": true
}
]
},
Expand Down Expand Up @@ -105,7 +107,8 @@
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush tables a with read lock"
"Query": "flush tables a with read lock",
"ReservedConnectionNeeded": true
},
"TablesUsed": [
"main.a"
Expand All @@ -128,7 +131,8 @@
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush local tables unsharded_a with read lock"
"Query": "flush local tables unsharded_a with read lock",
"ReservedConnectionNeeded": true
},
{
"OperatorType": "Send",
Expand All @@ -137,7 +141,8 @@
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush local tables unsharded_tab with read lock"
"Query": "flush local tables unsharded_tab with read lock",
"ReservedConnectionNeeded": true
},
{
"OperatorType": "Send",
Expand All @@ -146,7 +151,8 @@
"Sharded": true
},
"TargetDestination": "AllShards()",
"Query": "flush local tables `user`, user_extra with read lock"
"Query": "flush local tables `user`, user_extra with read lock",
"ReservedConnectionNeeded": true
}
]
},
Expand Down
15 changes: 13 additions & 2 deletions go/vt/vttablet/endtoend/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)

//TODO: Add Counter checks in all the tests.

func TestMultipleReserveHaveDifferentConnection(t *testing.T) {
framework.Server.Config().EnableSettingsPool = false
defer func() {
Expand Down Expand Up @@ -1190,3 +1188,16 @@ func TestReserveQueryTimeout(t *testing.T) {
assert.NoError(t,
client.Release())
}

// TestReserveFlushTables checks that `flush table with read lock` works only with reserve api.
func TestReserveFlushTables(t *testing.T) {
client := framework.NewClient()

_, err := client.Execute("flush tables with read lock", nil)
assert.ErrorContains(t, err, "Flush not allowed without reserved connection")

_, err = client.ReserveExecute("flush tables with read lock", nil, nil)
assert.NoError(t, err)
assert.NoError(t,
client.Release())
}
18 changes: 18 additions & 0 deletions go/vt/vttablet/tabletserver/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,21 @@ func analyzeDDL(stmt sqlparser.DDLStatement) (*Plan, error) {
}
return &Plan{PlanID: PlanDDL, FullQuery: fullQuery, FullStmt: stmt, NeedsReservedConn: stmt.IsTemporary()}, nil
}

func analyzeFlush(stmt *sqlparser.Flush, tables map[string]*schema.Table) (*Plan, error) {
plan := &Plan{PlanID: PlanFlush, FullQuery: GenerateFullQuery(stmt)}

for _, tbl := range stmt.TableNames {
if schemaTbl, ok := tables[tbl.Name.String()]; ok {
plan.AllTables = append(plan.AllTables, schemaTbl)
}
}
if len(plan.AllTables) == 1 {
plan.Table = plan.AllTables[0]
}

if stmt.WithLock {
plan.NeedsReservedConn = true
}
return plan, nil
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/planbuilder/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table, dbNam
case *sqlparser.Load:
plan, err = &Plan{PlanID: PlanLoad}, nil
case *sqlparser.Flush:
plan, err = &Plan{PlanID: PlanFlush, FullQuery: GenerateFullQuery(stmt)}, nil
plan, err = analyzeFlush(stmt, tables)
case *sqlparser.CallProc:
plan, err = &Plan{PlanID: PlanCallProc, FullQuery: GenerateFullQuery(stmt)}, nil
default:
Expand Down
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,25 @@ options:PassthroughDMLs
"FullQuery": "flush tables a, b"
}

# flush statement with read lock
"flush tables a,b with read lock"
{
"PlanID": "Flush",
"TableName": "",
"Permissions": [
{
"TableName": "a",
"Role": 2
},
{
"TableName": "b",
"Role": 2
}
],
"FullQuery": "flush tables a, b with read lock",
"NeedsReservedConn": true
}

# call proc
"call getAllTheThings()"
{
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (ep *TabletPlan) IsValid(hasReservedCon, hasSysSettings bool) error {

func isValid(planType planbuilder.PlanType, hasReservedCon bool, hasSysSettings bool) error {
switch planType {
case planbuilder.PlanSelectLockFunc, planbuilder.PlanDDL:
case planbuilder.PlanSelectLockFunc, planbuilder.PlanDDL, planbuilder.PlanFlush:
if hasReservedCon {
return nil
}
Expand Down
Loading