Skip to content

Commit

Permalink
Reduce flakiness in TwoPC testing (#16712)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Sep 5, 2024
1 parent 1ed18d0 commit 5ced946
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
9 changes: 8 additions & 1 deletion go/test/endtoend/cluster/move_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cluster

import (
"fmt"
"strings"
"testing"
"time"
)
Expand All @@ -31,17 +32,19 @@ type MoveTablesWorkflow struct {
targetKs string
srcKs string
tables string
tabletTypes []string
}

// NewMoveTables creates a new MoveTablesWorkflow.
func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow {
func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string, tabletTypes []string) *MoveTablesWorkflow {
return &MoveTablesWorkflow{
t: t,
clusterInstance: clusterInstance,
workflowName: workflowName,
tables: tables,
targetKs: targetKs,
srcKs: srcKs,
tabletTypes: tabletTypes,
}
}

Expand All @@ -52,6 +55,10 @@ func (mtw *MoveTablesWorkflow) Create() (string, error) {
} else {
args = append(args, "--all-tables")
}
if len(mtw.tabletTypes) != 0 {
args = append(args, "--tablet-types")
args = append(args, strings.Join(mtw.tabletTypes, ","))
}
return mtw.exec(args...)
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (vttablet *VttabletProcess) WaitForVReplicationToCatchup(t testing.TB, work
for ind, query := range queries {
waitDuration := 500 * time.Millisecond
for duration > 0 {
log.Infof("Executing query %s on %s", query, vttablet.Name)
log.Infof("Executing query %s on %s", query, vttablet.TabletPath)
lastChecked = time.Now()
qr, err := executeQuery(conn, query)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/transaction/twopc/stress/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
)

Expand Down Expand Up @@ -462,7 +463,7 @@ func moveTablesFuzzer(t *testing.T) {
require.NoError(t, err)
}
log.Errorf("MoveTables from - %v to %v", srcKeyspace, targetKeyspace)
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_fuzzer_update.
output, err := mtw.Create()
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
)

Expand Down Expand Up @@ -258,7 +259,7 @@ func mysqlRestartShard3(t *testing.T) error {
// moveTablesCancel runs a move tables command that we cancel in the end.
func moveTablesCancel(t *testing.T) error {
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
Expand All @@ -277,7 +278,7 @@ func moveTablesCancel(t *testing.T) error {
// moveTablesComplete runs a move tables command that we complete in the end.
func moveTablesComplete(t *testing.T) error {
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
Expand All @@ -297,7 +298,7 @@ func moveTablesReset(t *testing.T) {
err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema)
require.NoError(t, err)
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
Expand Down
17 changes: 13 additions & 4 deletions go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)

const (
Expand All @@ -37,17 +38,25 @@ const (
// ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query,
// so we have to do the deletions iteratively.
func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
t.Fatalf("Timeout out waiting for table to be cleared - %v", tableName)
return
default:
}
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
fmt.Printf("Error in connection - %v\n", err)
log.Errorf("Error in connection - %v\n", err)
time.Sleep(100 * time.Millisecond)
continue
}

res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false)
if err != nil {
fmt.Printf("Error in selecting - %v\n", err)
log.Errorf("Error in selecting - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
Expand All @@ -63,7 +72,7 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
conn.Close()
if err != nil {
fmt.Printf("Error in cleanup deletion - %v\n", err)
log.Errorf("Error in cleanup deletion - %v\n", err)
time.Sleep(100 * time.Millisecond)
continue
}
Expand Down

0 comments on commit 5ced946

Please sign in to comment.