Skip to content

Commit

Permalink
feat: run move tables in stress test
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 29, 2024
1 parent d916e81 commit 07dee3c
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 12 deletions.
94 changes: 94 additions & 0 deletions go/test/endtoend/cluster/move_tables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"fmt"
"testing"
"time"
)

// MoveTablesWorkflow is used to store the information needed to run
// MoveTables commands.
type MoveTablesWorkflow struct {
t *testing.T
clusterInstance *LocalProcessCluster
workflowName string
targetKs string
srcKs string
tables string
}

// NewMoveTables creates a new MoveTablesWorkflow.
func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow {
return &MoveTablesWorkflow{
t: t,
clusterInstance: clusterInstance,
workflowName: workflowName,
tables: tables,
targetKs: targetKs,
srcKs: srcKs,
}
}

func (mtw *MoveTablesWorkflow) Create() (string, error) {
args := []string{"Create", "--source-keyspace=" + mtw.srcKs}
if mtw.tables != "" {
args = append(args, "--tables="+mtw.tables)
} else {
args = append(args, "--all-tables")
}
return mtw.exec(args...)
}

func (mtw *MoveTablesWorkflow) exec(args ...string) (string, error) {
args2 := []string{"MoveTables", "--workflow=" + mtw.workflowName, "--target-keyspace=" + mtw.targetKs}
args2 = append(args2, args...)
return mtw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...)
}

func (mtw *MoveTablesWorkflow) SwitchReadsAndWrites() (string, error) {
return mtw.exec("SwitchTraffic")
}

func (mtw *MoveTablesWorkflow) ReverseReadsAndWrites() (string, error) {
return mtw.exec("ReverseTraffic")
}

func (mtw *MoveTablesWorkflow) Cancel() (string, error) {
return mtw.exec("Cancel")
}

func (mtw *MoveTablesWorkflow) Complete() (string, error) {
return mtw.exec("Complete")
}

func (mtw *MoveTablesWorkflow) Show() (string, error) {
return mtw.exec("Show")
}

func (mtw *MoveTablesWorkflow) WaitForVreplCatchup() {
for _, ks := range mtw.clusterInstance.Keyspaces {
if ks.Name != mtw.targetKs {
continue
}
for _, shard := range ks.Shards {
vttablet := shard.PrimaryTablet().VttabletProcess
vttablet.WaitForVReplicationToCatchup(mtw.t, mtw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", 10*time.Second)
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/transaction/twopc/stress/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func onlineDDLFuzzer(t *testing.T) {
return
}
fmt.Println("Running online DDL with uuid: ", output)
WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
}

func mysqlRestarts(t *testing.T) {
Expand Down
28 changes: 19 additions & 9 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
cell = "zone1"
hostname = "localhost"
sidecarDBName = "vt_ks"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
unshardedKeyspaceName = "uks"
cell = "zone1"
hostname = "localhost"

//go:embed schema.sql
SchemaSQL string
Expand Down Expand Up @@ -79,18 +79,28 @@ func TestMain(m *testing.M) {
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

// Start an unsharded keyspace
unshardedKeyspace := &cluster.Keyspace{
Name: unshardedKeyspaceName,
SchemaSQL: "",
VSchema: "{}",
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil {
return 1
}

// Start Vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = clusterInstance.GetVTParams(keyspaceName)
vtParams = clusterInstance.GetVTParams("")
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)

return m.Run()
Expand Down
32 changes: 30 additions & 2 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func TestDisruptions(t *testing.T) {
commitDelayTime: "20",
disruption: onlineDDL,
},
{
disruptionName: "MoveTables",
commitDelayTime: "20",
disruption: moveTables,
},
{
disruptionName: "EmergencyReparentShard",
commitDelayTime: "5",
Expand Down Expand Up @@ -239,6 +244,29 @@ func mysqlRestartShard3(t *testing.T) error {
return syscallutil.Kill(pid, syscall.SIGKILL)
}

// moveTables runs a move tables command.
func moveTables(t *testing.T) error {
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
// Initiate MoveTables for twopc_t1.
_, err := mtw.Create()
require.NoError(t, err)
// Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows.
mtw.WaitForVreplCatchup()
// SwitchTraffic
_, err = mtw.SwitchReadsAndWrites()
require.NoError(t, err)
// Wait for a couple of seconds and then switch the traffic back
time.Sleep(2 * time.Second)
_, err = mtw.ReverseReadsAndWrites()
require.NoError(t, err)
// Wait another couple of seconds and then cancel the workflow
time.Sleep(2 * time.Second)
_, err = mtw.Cancel()
require.NoError(t, err)
return nil
}

var orderedDDL = []string{
"alter table twopc_t1 add column extra_col1 varchar(20)",
"alter table twopc_t1 add column extra_col2 varchar(20)",
Expand All @@ -256,13 +284,13 @@ func onlineDDL(t *testing.T) error {
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
Expand Down

0 comments on commit 07dee3c

Please sign in to comment.