Skip to content

Commit

Permalink
Protect ExecuteFetchAsDBA against multi-statements, excluding a seq…
Browse files Browse the repository at this point in the history
…uence of `CREATE TABLE|VIEW`. (#14954)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jan 18, 2024
1 parent 8d90332 commit 7800699
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 50 deletions.
8 changes: 4 additions & 4 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ func tearDown(t *testing.T, initMysql bool) {
}
caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})
require.True(t, caughtUp, "Timed out waiting for all replicas to catch up")
promoteCommands := "STOP SLAVE; RESET SLAVE ALL; RESET MASTER;"
disableSemiSyncCommands := "SET GLOBAL rpl_semi_sync_master_enabled = false; SET GLOBAL rpl_semi_sync_slave_enabled = false"
promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"}
disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"}
for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} {
_, err := tablet.VttabletProcess.QueryTablet(promoteCommands, keyspaceName, true)
err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true)
require.Nil(t, err)
_, err = tablet.VttabletProcess.QueryTablet(disableSemiSyncCommands, keyspaceName, true)
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true)
require.Nil(t, err)
}

Expand Down
48 changes: 45 additions & 3 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,51 @@ func testTabletStatus(t *testing.T) {
}

func testExecuteAsDba(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
tcases := []struct {
query string
result string
expectErr bool
}{
{
query: "",
expectErr: true,
},
{
query: "SELECT 1 AS a",
result: oneTableOutput,
},
{
query: "SELECT 1 AS a; SELECT 1 AS a",
expectErr: true,
},
{
query: "create table t(id int)",
result: "",
},
{
query: "create table if not exists t(id int)",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int);",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int); SELECT 1 AS a",
expectErr: true,
},
}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
if tcase.expectErr {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tcase.result, result)
}
})
}
}

func testExecuteAsApp(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/mysqlserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
PARTITION BY HASH( TO_DAYS(created) )
PARTITIONS 10;
`
createProcSQL = `use vt_test_keyspace;
createProcSQL = `
CREATE PROCEDURE testing()
BEGIN
delete from vt_insert_test;
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestMain(m *testing.M) {
}

primaryTabletProcess := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, false); err != nil {
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, true); err != nil {
return 1, err
}

Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ func TestNoReplicationStatusAndIOThreadStopped(t *testing.T) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE; RESET SLAVE ALL`)
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE`)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `RESET SLAVE ALL`)
require.NoError(t, err)
//
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[3].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
// Run an additional command in the current primary which will only be acked by tablets[2] and be in its relay log.
Expand Down
29 changes: 18 additions & 11 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// make tablets[1 lag from the other tablets by setting the delay to a large number
utils.RunSQL(context.Background(), t, `stop slave;CHANGE MASTER TO MASTER_DELAY = 1999;start slave;`, tablets[1])
utils.RunSQLs(context.Background(), t, []string{`stop slave`, `CHANGE MASTER TO MASTER_DELAY = 1999`, `start slave;`}, tablets[1])

// insert another row in tablets[1
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]})
Expand Down Expand Up @@ -226,26 +226,33 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus
}

// commands to convert a replica to be writable
promoteReplicaCommands := "STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only = OFF;"
utils.RunSQL(ctx, t, promoteReplicaCommands, tablets[1])
promoteReplicaCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "SET GLOBAL read_only = OFF"}
utils.RunSQLs(ctx, t, promoteReplicaCommands, tablets[1])

// Get primary position
_, gtID := cluster.GetPrimaryPosition(t, *tablets[1], utils.Hostname)

// tablets[0] will now be a replica of tablets[1
changeReplicationSourceCommands := fmt.Sprintf("RESET MASTER; RESET SLAVE; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[0])
changeReplicationSourceCommands := []string{
"RESET MASTER",
"RESET SLAVE",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[0])

// Capture time when we made tablets[1 writable
baseTime := time.Now().UnixNano() / 1000000000

// tablets[2 will be a replica of tablets[1
changeReplicationSourceCommands = fmt.Sprintf("STOP SLAVE; RESET MASTER; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[2])
changeReplicationSourceCommands = []string{
"STOP SLAVE",
"RESET MASTER",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
"START SLAVE",
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[2])

// To test the downPrimary, we kill the old primary first and delete its tablet record
if downPrimary {
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams {
return connParams
}

// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet
func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) {
for _, sql := range sqls {
result := RunSQL(ctx, t, sql, tablet)
results = append(results, result)
}
return results
}

// RunSQL is used to run a SQL command directly on the MySQL instance of a vttablet
func RunSQL(ctx context.Context, t *testing.T, sql string, tablet *cluster.Vttablet) *sqltypes.Result {
tabletParams := getMysqlConnParam(tablet)
Expand Down
21 changes: 14 additions & 7 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("ReplicationFromOtherReplica", func(t *testing.T) {
// point replica at otherReplica
changeReplicationSourceCommand := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1; START SLAVE", utils.Hostname, otherReplica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommand, replica, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, otherReplica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, replica, "")
require.NoError(t, err)

// wait until the source port is set back correctly by vtorc
Expand All @@ -204,10 +208,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("CircularReplication", func(t *testing.T) {
// change the replication source on the primary
changeReplicationSourceCommands := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", replica.VttabletProcess.TabletHostname, replica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommands, curPrimary, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", replica.VttabletProcess.TabletHostname, replica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, curPrimary, "")
require.NoError(t, err)

// wait for curPrimary to reach stable state
Expand Down
16 changes: 3 additions & 13 deletions go/vt/schemadiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package schemadiff

import (
"errors"
"fmt"
"io"
"sort"
"strings"

Expand Down Expand Up @@ -118,17 +116,9 @@ func NewSchemaFromQueries(env *Environment, queries []string) (*Schema, error) {
// NewSchemaFromSQL creates a valid and normalized schema based on a SQL blob that contains
// CREATE statements for various objects (tables, views)
func NewSchemaFromSQL(env *Environment, sql string) (*Schema, error) {
var statements []sqlparser.Statement
tokenizer := env.Parser.NewStringTokenizer(sql)
for {
stmt, err := sqlparser.ParseNextStrictDDL(tokenizer)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("could not parse statement in SQL: %v: %w", sql, err)
}
statements = append(statements, stmt)
statements, err := env.Parser.SplitStatements(sql)
if err != nil {
return nil, err
}
return NewSchemaFromStatements(env, statements)
}
Expand Down
75 changes: 75 additions & 0 deletions go/vt/sqlparser/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,77 @@ func TestColumns_FindColumn(t *testing.T) {
}
}

func TestSplitStatements(t *testing.T) {
testcases := []struct {
input string
stmts int
wantErr bool
}{
{
input: "select * from table1; \t; \n; \n\t\t ;select * from table1;",
stmts: 2,
}, {
input: "select * from table1",
stmts: 1,
}, {
input: "select * from table1;",
stmts: 1,
}, {
input: "select * from table1; ",
stmts: 1,
}, {
input: "select * from table1; select * from table2;",
stmts: 2,
}, {
input: "create /*vt+ directive=true */ table t1 (id int); create table t2 (id int); create table t3 (id int)",
stmts: 3,
}, {
input: "create /*vt+ directive=true */ table t1 (id int); create table t2 (id int); create table t3 (id int);",
stmts: 3,
}, {
input: "select * from /* comment ; */ table1;",
stmts: 1,
}, {
input: "select * from table1 where semi = ';';",
stmts: 1,
}, {
input: "CREATE TABLE `total_data` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id', " +
"`region` varchar(32) NOT NULL COMMENT 'region name, like zh; th; kepler'," +
"`data_size` bigint NOT NULL DEFAULT '0' COMMENT 'data size;'," +
"`createtime` datetime NOT NULL DEFAULT NOW() COMMENT 'create time;'," +
"`comment` varchar(100) NOT NULL DEFAULT '' COMMENT 'comment'," +
"PRIMARY KEY (`id`))",
stmts: 1,
}, {
input: "create table t1 (id int primary key); create table t2 (id int primary key);",
stmts: 2,
}, {
input: ";;; create table t1 (id int primary key);;; ;create table t2 (id int primary key);",
stmts: 2,
}, {
input: ";create table t1 ;create table t2 (id;",
wantErr: true,
}, {
// Ignore quoted semicolon
input: ";create table t1 ';';;;create table t2 (id;",
wantErr: true,
},
}

parser := NewTestParser()
for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
statements, err := parser.SplitStatements(tcase.input)
if tcase.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tcase.stmts, len(statements))
}
})
}
}

func TestSplitStatementToPieces(t *testing.T) {
testcases := []struct {
input string
Expand Down Expand Up @@ -745,6 +816,10 @@ func TestSplitStatementToPieces(t *testing.T) {
// Ignore quoted semicolon
input: ";create table t1 ';';;;create table t2 (id;",
output: "create table t1 ';';create table t2 (id",
}, {
// Ignore quoted semicolon
input: "stop replica; start replica",
output: "stop replica; start replica",
},
}

Expand Down
17 changes: 17 additions & 0 deletions go/vt/sqlparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sqlparser

import (
"errors"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -229,6 +230,22 @@ func (p *Parser) SplitStatement(blob string) (string, string, error) {
return blob, "", nil
}

// SplitStatements splits a given blob into multiple SQL statements.
func (p *Parser) SplitStatements(blob string) (statements []Statement, err error) {
tokenizer := p.NewStringTokenizer(blob)
for {
stmt, err := ParseNext(tokenizer)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, err
}
statements = append(statements, stmt)
}
return statements, nil
}

// SplitStatementToPieces split raw sql statement that may have multi sql pieces to sql pieces
// returns the sql pieces blob contains; or error if sql cannot be parsed
func (p *Parser) SplitStatementToPieces(blob string) (pieces []string, err error) {
Expand Down
20 changes: 20 additions & 0 deletions go/vt/sqlparser/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sqlparser
import (
"fmt"
"sort"
"strings"

querypb "vitess.io/vitess/go/vt/proto/query"
)
Expand Down Expand Up @@ -160,3 +161,22 @@ func (p *Parser) ReplaceTableQualifiers(query, olddb, newdb string) (string, err
}
return query, nil
}

// ReplaceTableQualifiersMultiQuery accepts a multi-query string and modifies it
// via ReplaceTableQualifiers, one query at a time.
func (p *Parser) ReplaceTableQualifiersMultiQuery(multiQuery, olddb, newdb string) (string, error) {
queries, err := p.SplitStatementToPieces(multiQuery)
if err != nil {
return multiQuery, err
}
var modifiedQueries []string
for _, query := range queries {
// Replace any provided sidecar database qualifiers with the correct one.
query, err := p.ReplaceTableQualifiers(query, olddb, newdb)
if err != nil {
return query, err
}
modifiedQueries = append(modifiedQueries, query)
}
return strings.Join(modifiedQueries, ";"), nil
}
Loading

0 comments on commit 7800699

Please sign in to comment.