Skip to content

Commit

Permalink
[release-16.0] Protect ExecuteFetchAsDBA against multi-statements, …
Browse files Browse the repository at this point in the history
…excluding a sequence of `CREATE TABLE|VIEW`. (#14954) (#14982)

Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
vitess-bot[bot] and shlomi-noach authored Jan 18, 2024
1 parent 4cf4b7b commit 39cc3c1
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 37 deletions.
8 changes: 4 additions & 4 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,12 @@ func tearDown(t *testing.T, initMysql bool) {
}
caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})
require.True(t, caughtUp, "Timed out waiting for all replicas to catch up")
promoteCommands := "STOP SLAVE; RESET SLAVE ALL; RESET MASTER;"
disableSemiSyncCommands := "SET GLOBAL rpl_semi_sync_master_enabled = false; SET GLOBAL rpl_semi_sync_slave_enabled = false"
promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"}
disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"}
for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} {
_, err := tablet.VttabletProcess.QueryTablet(promoteCommands, keyspaceName, true)
err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true)
require.Nil(t, err)
_, err = tablet.VttabletProcess.QueryTablet(disableSemiSyncCommands, keyspaceName, true)
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true)
require.Nil(t, err)
}

Expand Down
28 changes: 28 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,34 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD
return executeQuery(conn, query)
}

// QueryTabletMultiple lets you execute multiple queries -- without any
// results -- against the tablet.
func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error {
conn, err := vttablet.TabletConn(keyspace, useDb)
if err != nil {
return err
}
defer conn.Close()

for _, query := range queries {
log.Infof("Executing query %s (on %s)", query, vttablet.Name)
_, err := executeQuery(conn, query)
if err != nil {
return err
}
}
return nil
}

// TabletConn opens a MySQL connection on this tablet
func (vttablet *VttabletProcess) TabletConn(keyspace string, useDb bool) (*mysql.Conn, error) {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
return vttablet.conn(&dbParams)
}

func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) {
dbParams := mysql.ConnParams{
Uname: "vt_dba",
Expand Down
48 changes: 45 additions & 3 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,51 @@ func testTabletStatus(t *testing.T) {
}

func testExecuteAsDba(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
tcases := []struct {
query string
result string
expectErr bool
}{
{
query: "",
expectErr: true,
},
{
query: "SELECT 1 AS a",
result: oneTableOutput,
},
{
query: "SELECT 1 AS a; SELECT 1 AS a",
expectErr: true,
},
{
query: "create table t(id int)",
result: "",
},
{
query: "create table if not exists t(id int)",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int);",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int); SELECT 1 AS a",
expectErr: true,
},
}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
if tcase.expectErr {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tcase.result, result)
}
})
}
}

func testExecuteAsApp(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/mysqlserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
PARTITION BY HASH( TO_DAYS(created) )
PARTITIONS 10;
`
createProcSQL = `use vt_test_keyspace;
createProcSQL = `
CREATE PROCEDURE testing()
BEGIN
delete from vt_insert_test;
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestMain(m *testing.M) {
}

primaryTabletProcess := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, false); err != nil {
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, true); err != nil {
return 1, err
}

Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ func TestNoReplicationStatusAndIOThreadStopped(t *testing.T) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE; RESET SLAVE ALL`)
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE`)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `RESET SLAVE ALL`)
require.NoError(t, err)
//
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[3].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
// Run an additional command in the current primary which will only be acked by tablets[2] and be in its relay log.
Expand Down
29 changes: 18 additions & 11 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// make tablets[1 lag from the other tablets by setting the delay to a large number
utils.RunSQL(context.Background(), t, `stop slave;CHANGE MASTER TO MASTER_DELAY = 1999;start slave;`, tablets[1])
utils.RunSQLs(context.Background(), t, []string{`stop slave`, `CHANGE MASTER TO MASTER_DELAY = 1999`, `start slave;`}, tablets[1])

// insert another row in tablets[1
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]})
Expand Down Expand Up @@ -213,26 +213,33 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus
}

// commands to convert a replica to be writable
promoteReplicaCommands := "STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only = OFF;"
utils.RunSQL(ctx, t, promoteReplicaCommands, tablets[1])
promoteReplicaCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "SET GLOBAL read_only = OFF"}
utils.RunSQLs(ctx, t, promoteReplicaCommands, tablets[1])

// Get primary position
_, gtID := cluster.GetPrimaryPosition(t, *tablets[1], utils.Hostname)

// tablets[0] will now be a replica of tablets[1
changeReplicationSourceCommands := fmt.Sprintf("RESET MASTER; RESET SLAVE; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[0])
changeReplicationSourceCommands := []string{
"RESET MASTER",
"RESET SLAVE",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[0])

// Capture time when we made tablets[1 writable
baseTime := time.Now().UnixNano() / 1000000000

// tablets[2 will be a replica of tablets[1
changeReplicationSourceCommands = fmt.Sprintf("STOP SLAVE; RESET MASTER; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[2])
changeReplicationSourceCommands = []string{
"STOP SLAVE",
"RESET MASTER",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
"START SLAVE",
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[2])

// To test the downPrimary, we kill the old primary first and delete its tablet record
if downPrimary {
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams {
return connParams
}

// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet
func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) {
for _, sql := range sqls {
result := RunSQL(ctx, t, sql, tablet)
results = append(results, result)
}
return results
}

// RunSQL is used to run a SQL command directly on the MySQL instance of a vttablet
func RunSQL(ctx context.Context, t *testing.T, sql string, tablet *cluster.Vttablet) *sqltypes.Result {
tabletParams := getMysqlConnParam(tablet)
Expand Down
21 changes: 14 additions & 7 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("ReplicationFromOtherReplica", func(t *testing.T) {
// point replica at otherReplica
changeReplicationSourceCommand := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1; START SLAVE", utils.Hostname, otherReplica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommand, replica, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, otherReplica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, replica, "")
require.NoError(t, err)

// wait until the source port is set back correctly by vtorc
Expand All @@ -199,10 +203,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("CircularReplication", func(t *testing.T) {
// change the replication source on the primary
changeReplicationSourceCommands := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", replica.VttabletProcess.TabletHostname, replica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommands, curPrimary, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", replica.VttabletProcess.TabletHostname, replica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, curPrimary, "")
require.NoError(t, err)

// wait for curPrimary to reach stable state
Expand Down
20 changes: 20 additions & 0 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,26 @@ func RunSQL(t *testing.T, sql string, tablet *cluster.Vttablet, db string) (*sql
return execute(t, conn, sql)
}

// RunSQLs is used to run a list of SQL statements on the given tablet
func RunSQLs(t *testing.T, sqls []string, tablet *cluster.Vttablet, db string) error {
// Get Connection
tabletParams := getMysqlConnParam(tablet, db)
var timeoutDuration = time.Duration(5 * len(sqls))
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second)
defer cancel()
conn, err := mysql.Connect(ctx, &tabletParams)
require.Nil(t, err)
defer conn.Close()

// Run SQLs
for _, sql := range sqls {
if _, err := execute(t, conn, sql); err != nil {
return err
}
}
return nil
}

func execute(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) {
t.Helper()
return conn.ExecuteFetch(query, 1000, true)
Expand Down
21 changes: 20 additions & 1 deletion go/vt/sqlparser/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,26 @@ func TestSplitStatementToPieces(t *testing.T) {
"`createtime` datetime NOT NULL DEFAULT NOW() COMMENT 'create time;'," +
"`comment` varchar(100) NOT NULL DEFAULT '' COMMENT 'comment'," +
"PRIMARY KEY (`id`))",
}}
}, {
input: "create table t1 (id int primary key); create table t2 (id int primary key);",
output: "create table t1 (id int primary key); create table t2 (id int primary key)",
}, {
input: ";;; create table t1 (id int primary key);;; ;create table t2 (id int primary key);",
output: " create table t1 (id int primary key);create table t2 (id int primary key)",
}, {
// The input doesn't have to be valid SQL statements!
input: ";create table t1 ;create table t2 (id;",
output: "create table t1 ;create table t2 (id",
}, {
// Ignore quoted semicolon
input: ";create table t1 ';';;;create table t2 (id;",
output: "create table t1 ';';create table t2 (id",
}, {
// Ignore quoted semicolon
input: "stop replica; start replica",
output: "stop replica; start replica",
},
}

for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
Expand Down
69 changes: 61 additions & 8 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,50 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// analyzeExecuteFetchAsDbaMultiQuery reutrns 'true' when at least one of the queries
// in the given SQL has a `/*vt+ allowZeroInDate=true */` directive.
func analyzeExecuteFetchAsDbaMultiQuery(sql string) (queries []string, parseable bool, countCreate int, allowZeroInDate bool, err error) {
queries, err = sqlparser.SplitStatementToPieces(sql)
if err != nil {
return nil, false, 0, false, err
}
if len(queries) == 0 {
return nil, false, 0, false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "no statements found in query: %s", sql)
}
parseable = true
for _, query := range queries {
// Some of the queries we receive here are legitimately non-parseable by our
// current parser, such as `CHANGE REPLICATION SOURCE TO...`. We must allow
// them and so we skip parsing errors.
stmt, err := sqlparser.Parse(query)
if err != nil {
parseable = false
continue
}
switch stmt.(type) {
case *sqlparser.CreateTable, *sqlparser.CreateView:
countCreate++
default:
}

if cmnt, ok := stmt.(sqlparser.Commented); ok {
directives := cmnt.GetParsedComments().Directives()
if directives.IsSet("allowZeroInDate") {
allowZeroInDate = true
}
}

}
return queries, parseable, countCreate, allowZeroInDate, nil
}

// ExecuteFetchAsDba will execute the given query, possibly disabling binlogs and reload schema.
func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) {
// get a connection
Expand All @@ -51,20 +90,34 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
_, _ = conn.ExecuteFetch("USE "+sqlescape.EscapeID(req.DbName), 1, false)
}

// Handle special possible directives
var directives *sqlparser.CommentDirectives
if stmt, err := sqlparser.Parse(string(req.Query)); err == nil {
if cmnt, ok := stmt.(sqlparser.Commented); ok {
directives = cmnt.GetParsedComments().Directives()
statements, _, countCreate, allowZeroInDate, err := analyzeExecuteFetchAsDbaMultiQuery(string(req.Query))
if err != nil {
return nil, err
}
if len(statements) > 1 {
// Up to v19, we allow multi-statement SQL in ExecuteFetchAsDba, but only for the specific case
// where all statements are CREATE TABLE or CREATE VIEW. This is to support `ApplySchema --batch-size`.
// In v20, we will not support multi statements whatsoever.
// v20 will throw an error by virtua of using ExecuteFetch instead of ExecuteFetchMulti.
if countCreate != len(statements) {
return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "multi statement queries are not supported in ExecuteFetchAsDba unless all are CREATE TABLE or CREATE VIEW")
}
}
if directives.IsSet("allowZeroInDate") {
if allowZeroInDate {
if _, err := conn.ExecuteFetch("set @@session.sql_mode=REPLACE(REPLACE(@@session.sql_mode, 'NO_ZERO_DATE', ''), 'NO_ZERO_IN_DATE', '')", 1, false); err != nil {
return nil, err
}
}
// run the query
result, err := conn.ExecuteFetch(string(req.Query), int(req.MaxRows), true /*wantFields*/)
// TODO(shlomi): we use ExecuteFetchMulti for backwards compatibility. In v20 we will not accept
// multi statement queries in ExecuteFetchAsDBA. This will be rewritten as:
// (in v20): result, err := ExecuteFetch(uq, int(req.MaxRows), true /*wantFields*/)
result, more, err := conn.ExecuteFetchMulti(string(req.Query), int(req.MaxRows), true /*wantFields*/)
for more {
_, more, _, err = conn.ReadQueryResult(0, false)
if err != nil {
return nil, err
}
}

// re-enable binlogs if necessary
if req.DisableBinlogs && !conn.IsClosed() {
Expand Down
Loading

0 comments on commit 39cc3c1

Please sign in to comment.