diff --git a/go/test/endtoend/cluster/move_tables.go b/go/test/endtoend/cluster/move_tables.go index c80f22cf5f5..6886a9dbb49 100644 --- a/go/test/endtoend/cluster/move_tables.go +++ b/go/test/endtoend/cluster/move_tables.go @@ -18,6 +18,7 @@ package cluster import ( "fmt" + "strings" "testing" "time" ) @@ -31,10 +32,11 @@ type MoveTablesWorkflow struct { targetKs string srcKs string tables string + tabletTypes []string } // NewMoveTables creates a new MoveTablesWorkflow. -func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow { +func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string, tabletTypes []string) *MoveTablesWorkflow { return &MoveTablesWorkflow{ t: t, clusterInstance: clusterInstance, @@ -42,6 +44,7 @@ func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowN tables: tables, targetKs: targetKs, srcKs: srcKs, + tabletTypes: tabletTypes, } } @@ -52,6 +55,10 @@ func (mtw *MoveTablesWorkflow) Create() (string, error) { } else { args = append(args, "--all-tables") } + if len(mtw.tabletTypes) != 0 { + args = append(args, "--tablet-types") + args = append(args, strings.Join(mtw.tabletTypes, ",")) + } return mtw.exec(args...) } diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index 2c6f467bb08..6c7a85ec533 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -637,7 +637,7 @@ func (vttablet *VttabletProcess) WaitForVReplicationToCatchup(t testing.TB, work for ind, query := range queries { waitDuration := 500 * time.Millisecond for duration > 0 { - log.Infof("Executing query %s on %s", query, vttablet.Name) + log.Infof("Executing query %s on %s", query, vttablet.TabletPath) lastChecked = time.Now() qr, err := executeQuery(conn, query) if err != nil { diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index 3fc1858fae4..596b54d55e3 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/syscallutil" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/schema" ) @@ -462,7 +463,7 @@ func moveTablesFuzzer(t *testing.T) { require.NoError(t, err) } log.Errorf("MoveTables from - %v to %v", srcKeyspace, targetKeyspace) - mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update") + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update", []string{topodatapb.TabletType_REPLICA.String()}) // Initiate MoveTables for twopc_fuzzer_update. output, err := mtw.Create() if err != nil { diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 5baee342bec..f9f46ab4b40 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -39,6 +39,7 @@ import ( twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/schema" ) @@ -258,7 +259,7 @@ func mysqlRestartShard3(t *testing.T) error { // moveTablesCancel runs a move tables command that we cancel in the end. func moveTablesCancel(t *testing.T) error { workflow := "TestDisruptions" - mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()}) // Initiate MoveTables for twopc_t1. output, err := mtw.Create() require.NoError(t, err, output) @@ -277,7 +278,7 @@ func moveTablesCancel(t *testing.T) error { // moveTablesComplete runs a move tables command that we complete in the end. func moveTablesComplete(t *testing.T) error { workflow := "TestDisruptions" - mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()}) // Initiate MoveTables for twopc_t1. output, err := mtw.Create() require.NoError(t, err, output) @@ -297,7 +298,7 @@ func moveTablesReset(t *testing.T) { err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema) require.NoError(t, err) workflow := "TestDisruptions" - mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1") + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1", []string{topodatapb.TabletType_REPLICA.String()}) // Initiate MoveTables for twopc_t1. output, err := mtw.Create() require.NoError(t, err, output) diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index 9d0ef838eb6..02c09a796df 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" ) const ( @@ -37,17 +38,25 @@ const ( // ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, // so we have to do the deletions iteratively. func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() for { + select { + case <-ctx.Done(): + t.Fatalf("Timeout out waiting for table to be cleared - %v", tableName) + return + default: + } conn, err := mysql.Connect(ctx, &vtParams) if err != nil { - fmt.Printf("Error in connection - %v\n", err) + log.Errorf("Error in connection - %v\n", err) + time.Sleep(100 * time.Millisecond) continue } res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) if err != nil { - fmt.Printf("Error in selecting - %v\n", err) + log.Errorf("Error in selecting - %v\n", err) conn.Close() time.Sleep(100 * time.Millisecond) continue @@ -63,7 +72,7 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) conn.Close() if err != nil { - fmt.Printf("Error in cleanup deletion - %v\n", err) + log.Errorf("Error in cleanup deletion - %v\n", err) time.Sleep(100 * time.Millisecond) continue }