Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-17.0] Protect ExecuteFetchAsDBA against multi-statements, excluding a sequence of CREATE TABLE|VIEW. (#14954) #14983

Merged
merged 3 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,12 @@ func tearDown(t *testing.T, initMysql bool) {
}
caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})
require.True(t, caughtUp, "Timed out waiting for all replicas to catch up")
promoteCommands := "STOP SLAVE; RESET SLAVE ALL; RESET MASTER;"
disableSemiSyncCommands := "SET GLOBAL rpl_semi_sync_master_enabled = false; SET GLOBAL rpl_semi_sync_slave_enabled = false"
promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"}
disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"}
for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} {
_, err := tablet.VttabletProcess.QueryTablet(promoteCommands, keyspaceName, true)
err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true)
require.Nil(t, err)
_, err = tablet.VttabletProcess.QueryTablet(disableSemiSyncCommands, keyspaceName, true)
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true)
require.Nil(t, err)
}

Expand Down
28 changes: 28 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,34 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD
return executeQuery(conn, query)
}

// QueryTabletMultiple lets you execute multiple queries -- without any
// results -- against the tablet.
func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error {
conn, err := vttablet.TabletConn(keyspace, useDb)
if err != nil {
return err
}
defer conn.Close()

for _, query := range queries {
log.Infof("Executing query %s (on %s)", query, vttablet.Name)
_, err := executeQuery(conn, query)
if err != nil {
return err
}
}
return nil
}

// TabletConn opens a MySQL connection on this tablet
func (vttablet *VttabletProcess) TabletConn(keyspace string, useDb bool) (*mysql.Conn, error) {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
return vttablet.conn(&dbParams)
}

func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) {
dbParams := mysql.ConnParams{
Uname: "vt_dba",
Expand Down
48 changes: 45 additions & 3 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,51 @@ func testTabletStatus(t *testing.T) {
}

func testExecuteAsDba(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
tcases := []struct {
query string
result string
expectErr bool
}{
{
query: "",
expectErr: true,
},
{
query: "SELECT 1 AS a",
result: oneTableOutput,
},
{
query: "SELECT 1 AS a; SELECT 1 AS a",
expectErr: true,
},
{
query: "create table t(id int)",
result: "",
},
{
query: "create table if not exists t(id int)",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int);",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int); SELECT 1 AS a",
expectErr: true,
},
}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
if tcase.expectErr {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tcase.result, result)
}
})
}
}

func testExecuteAsApp(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/mysqlserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
PARTITION BY HASH( TO_DAYS(created) )
PARTITIONS 10;
`
createProcSQL = `use vt_test_keyspace;
createProcSQL = `
CREATE PROCEDURE testing()
BEGIN
delete from vt_insert_test;
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestMain(m *testing.M) {
}

primaryTabletProcess := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, false); err != nil {
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, true); err != nil {
return 1, err
}

Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ func TestNoReplicationStatusAndIOThreadStopped(t *testing.T) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE; RESET SLAVE ALL`)
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE`)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `RESET SLAVE ALL`)
require.NoError(t, err)
//
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[3].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
// Run an additional command in the current primary which will only be acked by tablets[2] and be in its relay log.
Expand Down
29 changes: 18 additions & 11 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// make tablets[1 lag from the other tablets by setting the delay to a large number
utils.RunSQL(context.Background(), t, `stop slave;CHANGE MASTER TO MASTER_DELAY = 1999;start slave;`, tablets[1])
utils.RunSQLs(context.Background(), t, []string{`stop slave`, `CHANGE MASTER TO MASTER_DELAY = 1999`, `start slave;`}, tablets[1])

// insert another row in tablets[1
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]})
Expand Down Expand Up @@ -224,26 +224,33 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus
}

// commands to convert a replica to be writable
promoteReplicaCommands := "STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only = OFF;"
utils.RunSQL(ctx, t, promoteReplicaCommands, tablets[1])
promoteReplicaCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "SET GLOBAL read_only = OFF"}
utils.RunSQLs(ctx, t, promoteReplicaCommands, tablets[1])

// Get primary position
_, gtID := cluster.GetPrimaryPosition(t, *tablets[1], utils.Hostname)

// tablets[0] will now be a replica of tablets[1
changeReplicationSourceCommands := fmt.Sprintf("RESET MASTER; RESET SLAVE; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[0])
changeReplicationSourceCommands := []string{
"RESET MASTER",
"RESET SLAVE",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[0])

// Capture time when we made tablets[1 writable
baseTime := time.Now().UnixNano() / 1000000000

// tablets[2 will be a replica of tablets[1
changeReplicationSourceCommands = fmt.Sprintf("STOP SLAVE; RESET MASTER; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[2])
changeReplicationSourceCommands = []string{
"STOP SLAVE",
"RESET MASTER",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
"START SLAVE",
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[2])

// To test the downPrimary, we kill the old primary first and delete its tablet record
if downPrimary {
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams {
return connParams
}

// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet
func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) {
for _, sql := range sqls {
result := RunSQL(ctx, t, sql, tablet)
results = append(results, result)
}
return results
}

// RunSQL is used to run a SQL command directly on the MySQL instance of a vttablet
func RunSQL(ctx context.Context, t *testing.T, sql string, tablet *cluster.Vttablet) *sqltypes.Result {
tabletParams := getMysqlConnParam(tablet)
Expand Down
21 changes: 14 additions & 7 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("ReplicationFromOtherReplica", func(t *testing.T) {
// point replica at otherReplica
changeReplicationSourceCommand := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1; START SLAVE", utils.Hostname, otherReplica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommand, replica, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, otherReplica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, replica, "")
require.NoError(t, err)

// wait until the source port is set back correctly by vtorc
Expand All @@ -199,10 +203,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("CircularReplication", func(t *testing.T) {
// change the replication source on the primary
changeReplicationSourceCommands := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", replica.VttabletProcess.TabletHostname, replica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommands, curPrimary, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", replica.VttabletProcess.TabletHostname, replica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, curPrimary, "")
require.NoError(t, err)

// wait for curPrimary to reach stable state
Expand Down
21 changes: 20 additions & 1 deletion go/vt/sqlparser/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,26 @@ func TestSplitStatementToPieces(t *testing.T) {
"`createtime` datetime NOT NULL DEFAULT NOW() COMMENT 'create time;'," +
"`comment` varchar(100) NOT NULL DEFAULT '' COMMENT 'comment'," +
"PRIMARY KEY (`id`))",
}}
}, {
input: "create table t1 (id int primary key); create table t2 (id int primary key);",
output: "create table t1 (id int primary key); create table t2 (id int primary key)",
}, {
input: ";;; create table t1 (id int primary key);;; ;create table t2 (id int primary key);",
output: " create table t1 (id int primary key);create table t2 (id int primary key)",
}, {
// The input doesn't have to be valid SQL statements!
input: ";create table t1 ;create table t2 (id;",
output: "create table t1 ;create table t2 (id",
}, {
// Ignore quoted semicolon
input: ";create table t1 ';';;;create table t2 (id;",
output: "create table t1 ';';create table t2 (id",
}, {
// Ignore quoted semicolon
input: "stop replica; start replica",
output: "stop replica; start replica",
},
}

for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions go/vt/sqlparser/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sqlparser
import (
"fmt"
"sort"
"strings"

querypb "vitess.io/vitess/go/vt/proto/query"
)
Expand Down Expand Up @@ -160,3 +161,22 @@ func ReplaceTableQualifiers(query, olddb, newdb string) (string, error) {
}
return query, nil
}

// ReplaceTableQualifiersMultiQuery accepts a multi-query string and modifies it
// via ReplaceTableQualifiers, one query at a time.
func ReplaceTableQualifiersMultiQuery(multiQuery, olddb, newdb string) (string, error) {
queries, err := SplitStatementToPieces(multiQuery)
if err != nil {
return multiQuery, err
}
var modifiedQueries []string
for _, query := range queries {
// Replace any provided sidecar database qualifiers with the correct one.
query, err := ReplaceTableQualifiers(query, olddb, newdb)
if err != nil {
return query, err
}
modifiedQueries = append(modifiedQueries, query)
}
return strings.Join(modifiedQueries, ";"), nil
}
68 changes: 68 additions & 0 deletions go/vt/sqlparser/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,71 @@ func TestReplaceTableQualifiers(t *testing.T) {
})
}
}

func TestReplaceTableQualifiersMultiQuery(t *testing.T) {
origDB := "_vt"
tests := []struct {
name string
in string
newdb string
out string
wantErr bool
}{
{
name: "invalid select",
in: "select frog bar person",
out: "",
wantErr: true,
},
{
name: "simple select",
in: "select * from _vt.foo",
out: "select * from foo",
},
{
name: "simple select with new db",
in: "select * from _vt.foo",
newdb: "_vt_test",
out: "select * from _vt_test.foo",
},
{
name: "simple select with new db same",
in: "select * from _vt.foo where id=1", // should be unchanged
newdb: "_vt",
out: "select * from _vt.foo where id=1",
},
{
name: "simple select with new db needing escaping",
in: "select * from _vt.foo",
newdb: "1_vt-test",
out: "select * from `1_vt-test`.foo",
},
{
name: "multi query",
in: "select * from _vt.foo ; select * from _vt.bar",
out: "select * from foo;select * from bar",
},
{
name: "multi query with new db",
in: "select * from _vt.foo ; select * from _vt.bar",
newdb: "_vt_test",
out: "select * from _vt_test.foo;select * from _vt_test.bar",
},
{
name: "multi query with error",
in: "select * from _vt.foo ; select * from _vt.bar ; sel ect fr om wh at",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ReplaceTableQualifiersMultiQuery(tt.in, origDB, tt.newdb)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.out, got, "RemoveTableQualifiers(); in: %s, out: %s", tt.in, got)
})
}
}
Loading
Loading