Skip to content

Commit

Permalink
test: added online ddl disruption stress test
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Aug 26, 2024
1 parent 0b1d91e commit 12e2dc5
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 30 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
)

// Start keyspace
Expand Down
102 changes: 91 additions & 11 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
)

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
disruptionName string
commitDelayTime string
disruption func() error
disruption func(t *testing.T) error
}{
{
disruptionName: "No Disruption",
commitDelayTime: "1",
disruption: func() error {
disruption: func(t *testing.T) error {
return nil
},
},
Expand All @@ -68,6 +71,11 @@ func TestDisruptions(t *testing.T) {
commitDelayTime: "5",
disruption: vttabletRestartShard3,
},
{
disruptionName: "OnlineDDL",
commitDelayTime: "20",
disruption: onlineDDL,
},
{
disruptionName: "EmergencyReparentShard",
commitDelayTime: "5",
Expand Down Expand Up @@ -119,7 +127,7 @@ func TestDisruptions(t *testing.T) {
}()
}
// Run the disruption.
err := tt.disruption()
err := tt.disruption(t)
require.NoError(t, err)
// Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error.
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
Expand All @@ -145,6 +153,7 @@ func threadToWrite(t *testing.T, ctx context.Context, id int) {
continue
}
_, _ = utils.ExecAllowError(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, %d)", id, rand.Intn(10000)))
conn.Close()
}
}

Expand All @@ -170,11 +179,13 @@ func waitForResults(t *testing.T, query string, resultExpected string, waitTime
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err == nil {
res := utils.Exec(t, conn, query)
res, _ := utils.ExecAllowError(t, conn, query)
conn.Close()
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
if res != nil {
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
}
}
}
time.Sleep(100 * time.Millisecond)
Expand All @@ -187,29 +198,29 @@ Cluster Level Disruptions for the fuzzer
*/

// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func prsShard3() error {
func prsShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias)
}

// ersShard3 runs a ERS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func ersShard3() error {
func ersShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias)
return err
}

// vttabletRestartShard3 restarts the first vttablet of the third shard.
func vttabletRestartShard3() error {
func vttabletRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
tablet := shard.Vttablets[0]
return tablet.RestartOnlyTablet()
}

// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard.
func mysqlRestartShard3() error {
func mysqlRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
vttablets := shard.Vttablets
tablet := vttablets[0]
Expand All @@ -227,3 +238,72 @@ func mysqlRestartShard3() error {
}
return syscallutil.Kill(pid, syscall.SIGKILL)
}

var randomDDL = []string{
"alter table twopc_t1 add column extra_col1 varchar(20)",
"alter table twopc_t1 add column extra_col2 varchar(20)",
"alter table twopc_t1 add column extra_col3 varchar(20)",
"alter table twopc_t1 add column extra_col4 varchar(20)",
}

var count = 0

// onlineDDL runs a DDL statement.
func onlineDDL(t *testing.T) error {
output, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(keyspaceName, randomDDL[count], cluster.ApplySchemaParams{
DDLStrategy: "vitess --force-cut-over-after=1ms",
})
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
}
query := fmt.Sprintf("show vitess_migrations like '%s'", uuid)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for {
countMatchedShards := 0
conn, err := mysql.Connect(ctx, vtParams)
if err != nil {
continue
}
r := utils.Exec(t, conn, query)
for _, row := range r.Named().Rows {
shardName := row["shard"].ToString()
if !shardNames[shardName] {
// irrelevant shard
continue
}
lastKnownStatus = row["migration_status"].ToString()
if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
return
}
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
conn.Close()
if err != nil {
fmt.Printf("Error in cleanup deletion - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
// This could be due to ongoing cutover happening in vreplication workflow
// regarding OnlineDDL or MoveTables.
for _, query := range conn.TxProperties().Queries {
qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...)
qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, 0, query.Tables...)
if qr != nil {
act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{})
if act != rules.QRContinue {
Expand All @@ -108,7 +108,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
// If they are put in the prepared pool, then vreplication workflow waits.
// This check helps reject the prepare that came later.
for _, query := range conn.TxProperties().Queries {
qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...)
qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, 0, query.Tables...)
if qr != nil {
act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{})
if act != rules.QRContinue {
Expand Down
6 changes: 2 additions & 4 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
Expand Down Expand Up @@ -212,9 +211,8 @@ func TestExecutorPrepareRuleFailure(t *testing.T) {
sc, err := tsv.te.txPool.GetAndLock(txid, "adding query property")
require.NoError(t, err)
sc.txProps.Queries = append(sc.txProps.Queries, tx.Query{
Sql: "update test_table set col = 5",
PlanType: planbuilder.PlanUpdate,
Tables: []string{"test_table"},
Sql: "update test_table set col = 5",
Tables: []string{"test_table"},
})
sc.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (qre *QueryExecutor) txFetch(conn *StatefulConnection, record bool) (*sqlty
}
// Only record successful queries.
if record {
conn.TxProperties().RecordQuery(sql, qre.plan.PlanID, qre.plan.TableNames())
conn.TxProperties().RecordQueryDetail(sql, qre.plan.TableNames())
}
return qr, nil
}
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/test/utils"
Expand Down Expand Up @@ -241,7 +242,9 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
turnOnTxEngine()
assert.EqualValues(t, 1, len(tsv.te.preparedPool.conns), "len(tsv.te.preparedPool.conns)")
got := tsv.te.preparedPool.conns["dtid0"].TxProperties().Queries
want := []string{"update test_table set `name` = 2 where pk = 1 limit 10001"}
want := []tx.Query{{
Sql: "update test_table set `name` = 2 where pk = 1 limit 10001",
Tables: []string{"test_table"}}}
utils.MustMatch(t, want, got, "Prepared queries")
turnOffTxEngine()
assert.Empty(t, tsv.te.preparedPool.conns, "tsv.te.preparedPool.conns")
Expand Down Expand Up @@ -275,7 +278,9 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
turnOnTxEngine()
assert.EqualValues(t, 1, len(tsv.te.preparedPool.conns), "len(tsv.te.preparedPool.conns)")
got = tsv.te.preparedPool.conns["a:b:10"].TxProperties().Queries
want = []string{"update test_table set `name` = 2 where pk = 1 limit 10001"}
want = []tx.Query{{
Sql: "update test_table set `name` = 2 where pk = 1 limit 10001",
Tables: []string{"test_table"}}}
utils.MustMatch(t, want, got, "Prepared queries")
wantFailed := map[string]error{"a:b:20": errPrepFailed}
utils.MustMatch(t, tsv.te.preparedPool.reserved, wantFailed, fmt.Sprintf("Failed dtids: %v, want %v", tsv.te.preparedPool.reserved, wantFailed))
Expand Down
33 changes: 24 additions & 9 deletions go/vt/vttablet/tabletserver/tx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
)

type (
Expand Down Expand Up @@ -64,9 +63,8 @@ type (
)

type Query struct {
Sql string
PlanType planbuilder.PlanType
Tables []string
Sql string
Tables []string
}

const (
Expand Down Expand Up @@ -121,15 +119,32 @@ var txNames = map[ReleaseReason]string{
ConnRenewFail: "renewFail",
}

// RecordQuery records the query against this transaction.
func (p *Properties) RecordQuery(query string, planType planbuilder.PlanType, tables []string) {
// RecordQueryDetail records the query and tables against this transaction.
func (p *Properties) RecordQueryDetail(query string, tables []string) {
if p == nil {
return
}
p.Queries = append(p.Queries, Query{
Sql: query,
PlanType: planType,
Tables: tables,
Sql: query,
Tables: tables,
})
}

// RecordQuery records the query and extract tables against this transaction.
func (p *Properties) RecordQuery(query string, parser *sqlparser.Parser) {
if p == nil {
return
}
stmt, err := parser.Parse(query)
if err != nil {
// This should neven happen, but if it does,
// we would not be able to block cut-overs on this query.
return
}
tables := sqlparser.ExtractAllTables(stmt)
p.Queries = append(p.Queries, Query{
Sql: query,
Tables: tables,
})
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ outer:
continue
}
for _, stmt := range preparedTx.Queries {
conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser())
_, err := conn.Exec(ctx, stmt, 1, false)
if err != nil {
allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid))
Expand Down

0 comments on commit 12e2dc5

Please sign in to comment.