Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce flakiness in TwoPC testing #16712

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion go/test/endtoend/cluster/move_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cluster

import (
"fmt"
"strings"
"testing"
"time"
)
Expand All @@ -31,17 +32,19 @@ type MoveTablesWorkflow struct {
targetKs string
srcKs string
tables string
tabletTypes []string
}

// NewMoveTables creates a new MoveTablesWorkflow.
func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow {
func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string, tabletTypes []string) *MoveTablesWorkflow {
return &MoveTablesWorkflow{
t: t,
clusterInstance: clusterInstance,
workflowName: workflowName,
tables: tables,
targetKs: targetKs,
srcKs: srcKs,
tabletTypes: tabletTypes,
}
}

Expand All @@ -52,6 +55,10 @@ func (mtw *MoveTablesWorkflow) Create() (string, error) {
} else {
args = append(args, "--all-tables")
}
if len(mtw.tabletTypes) != 0 {
args = append(args, "--tablet-types")
args = append(args, strings.Join(mtw.tabletTypes, ","))
}
return mtw.exec(args...)
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (vttablet *VttabletProcess) WaitForVReplicationToCatchup(t testing.TB, work
for ind, query := range queries {
waitDuration := 500 * time.Millisecond
for duration > 0 {
log.Infof("Executing query %s on %s", query, vttablet.Name)
log.Infof("Executing query %s on %s", query, vttablet.TabletPath)
lastChecked = time.Now()
qr, err := executeQuery(conn, query)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/transaction/twopc/stress/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
)

Expand Down Expand Up @@ -462,7 +463,7 @@ func moveTablesFuzzer(t *testing.T) {
require.NoError(t, err)
}
log.Errorf("MoveTables from - %v to %v", srcKeyspace, targetKeyspace)
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_fuzzer_update.
output, err := mtw.Create()
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
)

Expand Down Expand Up @@ -258,7 +259,7 @@ func mysqlRestartShard3(t *testing.T) error {
// moveTablesCancel runs a move tables command that we cancel in the end.
func moveTablesCancel(t *testing.T) error {
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
Expand All @@ -277,7 +278,7 @@ func moveTablesCancel(t *testing.T) error {
// moveTablesComplete runs a move tables command that we complete in the end.
func moveTablesComplete(t *testing.T) error {
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
Expand All @@ -297,7 +298,7 @@ func moveTablesReset(t *testing.T) {
err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema)
require.NoError(t, err)
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1")
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()})
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
Expand Down
17 changes: 13 additions & 4 deletions go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)

const (
Expand All @@ -37,17 +38,25 @@ const (
// ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query,
// so we have to do the deletions iteratively.
func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
t.Fatalf("Timeout out waiting for table to be cleared - %v", tableName)
return
default:
}
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
fmt.Printf("Error in connection - %v\n", err)
log.Errorf("Error in connection - %v\n", err)
time.Sleep(100 * time.Millisecond)
continue
}

res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false)
if err != nil {
fmt.Printf("Error in selecting - %v\n", err)
log.Errorf("Error in selecting - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
Expand All @@ -63,7 +72,7 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
conn.Close()
if err != nil {
fmt.Printf("Error in cleanup deletion - %v\n", err)
log.Errorf("Error in cleanup deletion - %v\n", err)
time.Sleep(100 * time.Millisecond)
continue
}
Expand Down
Loading