From 07dee3c183200da9de446d0a2908d84a58035646 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 29 Aug 2024 09:45:05 +0530 Subject: [PATCH] feat: run move tables in stress test Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/move_tables.go | 94 +++++++++++++++++++ .../transaction/twopc/stress/fuzzer_test.go | 2 +- .../transaction/twopc/stress/main_test.go | 28 ++++-- .../transaction/twopc/stress/stress_test.go | 32 ++++++- 4 files changed, 144 insertions(+), 12 deletions(-) create mode 100644 go/test/endtoend/cluster/move_tables.go diff --git a/go/test/endtoend/cluster/move_tables.go b/go/test/endtoend/cluster/move_tables.go new file mode 100644 index 00000000000..96bc0108ce2 --- /dev/null +++ b/go/test/endtoend/cluster/move_tables.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "testing" + "time" +) + +// MoveTablesWorkflow is used to store the information needed to run +// MoveTables commands. +type MoveTablesWorkflow struct { + t *testing.T + clusterInstance *LocalProcessCluster + workflowName string + targetKs string + srcKs string + tables string +} + +// NewMoveTables creates a new MoveTablesWorkflow. +func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow { + return &MoveTablesWorkflow{ + t: t, + clusterInstance: clusterInstance, + workflowName: workflowName, + tables: tables, + targetKs: targetKs, + srcKs: srcKs, + } +} + +func (mtw *MoveTablesWorkflow) Create() (string, error) { + args := []string{"Create", "--source-keyspace=" + mtw.srcKs} + if mtw.tables != "" { + args = append(args, "--tables="+mtw.tables) + } else { + args = append(args, "--all-tables") + } + return mtw.exec(args...) +} + +func (mtw *MoveTablesWorkflow) exec(args ...string) (string, error) { + args2 := []string{"MoveTables", "--workflow=" + mtw.workflowName, "--target-keyspace=" + mtw.targetKs} + args2 = append(args2, args...) + return mtw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...) +} + +func (mtw *MoveTablesWorkflow) SwitchReadsAndWrites() (string, error) { + return mtw.exec("SwitchTraffic") +} + +func (mtw *MoveTablesWorkflow) ReverseReadsAndWrites() (string, error) { + return mtw.exec("ReverseTraffic") +} + +func (mtw *MoveTablesWorkflow) Cancel() (string, error) { + return mtw.exec("Cancel") +} + +func (mtw *MoveTablesWorkflow) Complete() (string, error) { + return mtw.exec("Complete") +} + +func (mtw *MoveTablesWorkflow) Show() (string, error) { + return mtw.exec("Show") +} + +func (mtw *MoveTablesWorkflow) WaitForVreplCatchup() { + for _, ks := range mtw.clusterInstance.Keyspaces { + if ks.Name != mtw.targetKs { + continue + } + for _, shard := range ks.Shards { + vttablet := shard.PrimaryTablet().VttabletProcess + vttablet.WaitForVReplicationToCatchup(mtw.t, mtw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", 10*time.Second) + } + } +} diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index 932fcae1217..3ba29e50713 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -443,7 +443,7 @@ func onlineDDLFuzzer(t *testing.T) { return } fmt.Println("Running online DDL with uuid: ", output) - WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) } func mysqlRestarts(t *testing.T) { diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go index ef8e454be15..05525171d2d 100644 --- a/go/test/endtoend/transaction/twopc/stress/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -32,13 +32,13 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - vtgateGrpcAddress string - keyspaceName = "ks" - cell = "zone1" - hostname = "localhost" - sidecarDBName = "vt_ks" + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + unshardedKeyspaceName = "uks" + cell = "zone1" + hostname = "localhost" //go:embed schema.sql SchemaSQL string @@ -79,18 +79,28 @@ func TestMain(m *testing.M) { Name: keyspaceName, SchemaSQL: SchemaSQL, VSchema: VSchema, - SidecarDBName: sidecarDBName, DurabilityPolicy: "semi_sync", } if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { return 1 } + // Start an unsharded keyspace + unshardedKeyspace := &cluster.Keyspace{ + Name: unshardedKeyspaceName, + SchemaSQL: "", + VSchema: "{}", + DurabilityPolicy: "semi_sync", + } + if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil { + return 1 + } + // Start Vtgate if err := clusterInstance.StartVtgate(); err != nil { return 1 } - vtParams = clusterInstance.GetVTParams(keyspaceName) + vtParams = clusterInstance.GetVTParams("") vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) return m.Run() diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 4dae0156b9d..6db0687a5ab 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -76,6 +76,11 @@ func TestDisruptions(t *testing.T) { commitDelayTime: "20", disruption: onlineDDL, }, + { + disruptionName: "MoveTables", + commitDelayTime: "20", + disruption: moveTables, + }, { disruptionName: "EmergencyReparentShard", commitDelayTime: "5", @@ -239,6 +244,29 @@ func mysqlRestartShard3(t *testing.T) error { return syscallutil.Kill(pid, syscall.SIGKILL) } +// moveTables runs a move tables command. +func moveTables(t *testing.T) error { + workflow := "TestDisruptions" + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + _, err := mtw.Create() + require.NoError(t, err) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup() + // SwitchTraffic + _, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err) + // Wait for a couple of seconds and then switch the traffic back + time.Sleep(2 * time.Second) + _, err = mtw.ReverseReadsAndWrites() + require.NoError(t, err) + // Wait another couple of seconds and then cancel the workflow + time.Sleep(2 * time.Second) + _, err = mtw.Cancel() + require.NoError(t, err) + return nil +} + var orderedDDL = []string{ "alter table twopc_t1 add column extra_col1 varchar(20)", "alter table twopc_t1 add column extra_col2 varchar(20)", @@ -256,13 +284,13 @@ func onlineDDL(t *testing.T) error { require.NoError(t, err) count++ fmt.Println("uuid: ", output) - status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status) require.Equal(t, schema.OnlineDDLStatusComplete, status) return nil } -func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { +func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { shardNames := map[string]bool{} for _, shard := range shards { shardNames[shard.Name] = true