Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flush tables with read lock to run only with reserved connection #14720

Merged
merged 6 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package vtgate

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/test/endtoend/utils"

Expand Down Expand Up @@ -336,6 +340,52 @@ func TestFlush(t *testing.T) {
utils.Exec(t, conn, "flush local tables t1, t2")
}

// TestFlushLock tests that ftwrl and unlock tables should unblock other session connections to execute the query.
func TestFlushLock(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment describing what you are testing here?

conn, closer := start(t)
defer closer()

// replica: fail it
utils.Exec(t, conn, "use @replica")
_, err := utils.ExecAllowError(t, conn, "flush tables t1, t2 with read lock")
require.ErrorContains(t, err, "VT09012: FLUSH statement with REPLICA tablet not allowed")

// primary: should work
utils.Exec(t, conn, "use @primary")
utils.Exec(t, conn, "flush tables t1, t2 with read lock")

var cnt atomic.Int32
go func() {
ctx := context.Background()
conn2, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn2.Close()

cnt.Add(1)
utils.Exec(t, conn2, "select * from t1 for update")
cnt.Add(1)
}()
for cnt.Load() == 0 {
}
// added sleep to let the query execute inside the go routine, which should be blocked.
time.Sleep(1 * time.Second)
require.EqualValues(t, 1, cnt.Load())

// unlock it
utils.Exec(t, conn, "unlock tables")

// now wait for go routine to complete.
timeout := time.After(3 * time.Second)
for cnt.Load() != 2 {
select {
case <-timeout:
t.Fatalf("test timeout waiting for select query to complete")
default:

}
}
}

func TestShowVariables(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
64 changes: 32 additions & 32 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Send struct {
// MultishardAutocommit specifies that a multishard transaction query can autocommit
MultishardAutocommit bool

ReservedConnectionNeeded bool

noInputs
}

Expand Down Expand Up @@ -88,19 +90,12 @@ func (s *Send) GetTableName() string {
func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, 0)
defer cancelFunc()
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return nil, err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

queries := make([]*querypb.BoundQuery, len(rss))
for i, rs := range rss {
bv := bindVars
Expand All @@ -123,6 +118,26 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str
return result, nil
}

func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*srvtopo.ResolvedShard, error) {
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
if err != nil {
return nil, err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

if s.ReservedConnectionNeeded {
vcursor.Session().NeedsReservedConn()
}
return rss, nil
}

func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool {
if s.IsDML {
return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval()
Expand All @@ -140,19 +155,11 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV

// TryStreamExecute implements Primitive interface
func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

multiBindVars := make([]map[string]*querypb.BindVariable, len(rss))
for i, rs := range rss {
bv := bindVars
Expand All @@ -178,20 +185,13 @@ func (s *Send) GetFields(ctx context.Context, vcursor VCursor, bindVars map[stri

func (s *Send) description() PrimitiveDescription {
other := map[string]any{
"Query": s.Query,
"Table": s.GetTableName(),
}
if s.IsDML {
other["IsDML"] = true
}
if s.SingleShardOnly {
other["SingleShardOnly"] = true
}
if s.ShardNameNeeded {
other["ShardNameNeeded"] = true
}
if s.MultishardAutocommit {
other["MultishardAutocommit"] = true
"Query": s.Query,
"Table": s.GetTableName(),
"IsDML": s.IsDML,
"SingleShardOnly": s.SingleShardOnly,
"ShardNameNeeded": s.ShardNameNeeded,
"MultishardAutocommit": s.MultishardAutocommit,
"ReservedConnectionNeeded": s.ReservedConnectionNeeded,
}
return PrimitiveDescription{
OperatorType: "Send",
Expand Down
79 changes: 79 additions & 0 deletions go/vt/vtgate/engine/unlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vterrors"
)

var _ Primitive = (*Unlock)(nil)

// Unlock primitive will execute unlock tables to all connections in the session.
type Unlock struct {
noTxNeeded
noInputs
}

const unlockTables = "unlock tables"

func (u *Unlock) RouteType() string {
return "UNLOCK"
}

func (u *Unlock) GetKeyspaceName() string {
return ""
}

func (u *Unlock) GetTableName() string {
return ""
}

func (u *Unlock) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.VT13001("GetFields should not be called for unlock tables")
}

func (u *Unlock) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
rss := vcursor.Session().ShardSession()

if len(rss) == 0 {
return &sqltypes.Result{}, nil
}
bqs := make([]*querypb.BoundQuery, len(rss))
for i := 0; i < len(rss); i++ {
bqs[i] = &querypb.BoundQuery{Sql: unlockTables}
}
qr, errs := vcursor.ExecuteMultiShard(ctx, u, rss, bqs, true, false)
return qr, vterrors.Aggregate(errs)
}

func (u *Unlock) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
qr, err := u.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return err
}
return callback(qr)
}

func (u *Unlock) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "UnlockTables",
}
}
30 changes: 13 additions & 17 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,12 @@ func buildFlushOptions(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*pla
dest = key.DestinationAllShards{}
}

tc := &tableCollector{}
for _, tbl := range stmt.TableNames {
tc.addASTTable(keyspace.Name, tbl)
}

return newPlanResult(&engine.Send{
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
IsDML: false,
SingleShardOnly: false,
}, tc.getTables()...), nil
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
ReservedConnectionNeeded: stmt.WithLock,
}), nil
}

func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) {
Expand Down Expand Up @@ -441,9 +435,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
if len(tablesMap) == 1 {
for sendDest, tables := range tablesMap {
return newPlanResult(&engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
ReservedConnectionNeeded: stmt.WithLock,
}, tc.getTables()...), nil
}
}
Expand All @@ -455,9 +450,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
var sources []engine.Primitive
for _, sendDest := range keys {
plan := &engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])),
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])),
ReservedConnectionNeeded: stmt.WithLock,
}
sources = append(sources, plan)
}
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/planbuilder/locktables.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ func buildLockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ planco

// buildUnlockPlan plans lock tables statement.
func buildUnlockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ plancontext.VSchema) (*planResult, error) {
log.Warningf("Unlock Tables statement is ignored: %v", stmt)
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(&engine.Unlock{}), nil
}
40 changes: 39 additions & 1 deletion go/vt/vtgate/planbuilder/testdata/flush_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush local tables with read lock"
"Query": "flush local tables with read lock",
"ReservedConnectionNeeded": true
}
}
},
Expand All @@ -53,5 +54,42 @@
"Query": "flush local hosts, logs"
}
}
},
{
"comment": "Flush statement with multiple tables in different keyspace with read lock",
"query": "flush tables user.music, main.unsharded with read lock",
"plan": {
"QueryType": "FLUSH",
"Original": "flush tables user.music, main.unsharded with read lock",
"Instructions": {
"OperatorType": "Concatenate",
"Inputs": [
{
"OperatorType": "Send",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush tables unsharded with read lock",
"ReservedConnectionNeeded": true
},
{
"OperatorType": "Send",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TargetDestination": "AllShards()",
"Query": "flush tables music with read lock",
"ReservedConnectionNeeded": true
}
]
},
"TablesUsed": [
"main.unsharded",
"user.music"
]
}
}
]
Loading
Loading