Skip to content

Commit

Permalink
Move tables with atomic transactions (#16676)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Sep 3, 2024
1 parent 9137c5b commit b2ec6a5
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 25 deletions.
12 changes: 11 additions & 1 deletion doc/design-docs/AtomicTransactionsWithDisruptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,14 @@ If it finds no prepared transaction of the table, it moves forward with the cut-
In the Prepare code, we check the query rules before adding the transaction to the prepared list and re-check the rules before storing the transaction logs in the transaction redo table.
Any transaction that went past the first check will fail the second check if the cutover proceeds.

The check on both sides prevents either the cutover from proceeding or the transaction from being prepared.
The check on both sides prevents either the cutover from proceeding or the transaction from being prepared.

## MoveTables

The only step of a `MoveTables` workflow that needs to synchronize with atomic transactions is `SwitchTraffic` for writes. As part of this step, we want to disallow writes to only the tables involved. We use `DeniedTables` in `ShardInfo` to accomplish this. After we update the topo server with the new `DeniedTables`, we make all the vttablets refresh their topo to ensure that they've registered the change.

On vttablet, the `DeniedTables` are used to add query rules very similar to the ones in Online DDL. The only difference is that in Online DDL, we buffer the queries, but for `SwitchTraffic` we fail them altogether. Addition of these query rules, prevents any new atomic transactions from being prepared.

Next, we try locking the tables to ensure no existing write is pending. This step blocks until all open prepared transactions have succeeded.

After this step, `SwitchTraffic` can proceed without any issues, since we are guaranteed to reject any new atomic transactions until the `DeniedTables` has been reset, and having acquired the table lock, we know no write is currently in progress.
94 changes: 94 additions & 0 deletions go/test/endtoend/cluster/move_tables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"fmt"
"testing"
"time"
)

// MoveTablesWorkflow is used to store the information needed to run
// MoveTables commands.
type MoveTablesWorkflow struct {
t *testing.T
clusterInstance *LocalProcessCluster
workflowName string
targetKs string
srcKs string
tables string
}

// NewMoveTables creates a new MoveTablesWorkflow.
func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow {
return &MoveTablesWorkflow{
t: t,
clusterInstance: clusterInstance,
workflowName: workflowName,
tables: tables,
targetKs: targetKs,
srcKs: srcKs,
}
}

func (mtw *MoveTablesWorkflow) Create() (string, error) {
args := []string{"Create", "--source-keyspace=" + mtw.srcKs}
if mtw.tables != "" {
args = append(args, "--tables="+mtw.tables)
} else {
args = append(args, "--all-tables")
}
return mtw.exec(args...)
}

func (mtw *MoveTablesWorkflow) exec(args ...string) (string, error) {
args2 := []string{"MoveTables", "--workflow=" + mtw.workflowName, "--target-keyspace=" + mtw.targetKs}
args2 = append(args2, args...)
return mtw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...)
}

func (mtw *MoveTablesWorkflow) SwitchReadsAndWrites() (string, error) {
return mtw.exec("SwitchTraffic")
}

func (mtw *MoveTablesWorkflow) ReverseReadsAndWrites() (string, error) {
return mtw.exec("ReverseTraffic")
}

func (mtw *MoveTablesWorkflow) Cancel() (string, error) {
return mtw.exec("Cancel")
}

func (mtw *MoveTablesWorkflow) Complete() (string, error) {
return mtw.exec("Complete")
}

func (mtw *MoveTablesWorkflow) Show() (string, error) {
return mtw.exec("Show")
}

func (mtw *MoveTablesWorkflow) WaitForVreplCatchup(timeToWait time.Duration) {
for _, ks := range mtw.clusterInstance.Keyspaces {
if ks.Name != mtw.targetKs {
continue
}
for _, shard := range ks.Shards {
vttablet := shard.PrimaryTablet().VttabletProcess
vttablet.WaitForVReplicationToCatchup(mtw.t, mtw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", timeToWait)
}
}
}
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ func (vttablet *VttabletProcess) getDBSystemValues(placeholder string, value str

// WaitForVReplicationToCatchup waits for "workflow" to finish copying
func (vttablet *VttabletProcess) WaitForVReplicationToCatchup(t testing.TB, workflow, database string, sidecarDBName string, duration time.Duration) {
t.Helper()
if sidecarDBName == "" {
sidecarDBName = sidecar.DefaultName
}
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func CheckForceMigrationCutOver(t *testing.T, vtParams *mysql.ConnParams, shards

// CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) bool {
query, err := sqlparser.ParseAndBind("show vitess_migrations like %a",
ksName := shards[0].PrimaryTablet().VttabletProcess.Keyspace
query, err := sqlparser.ParseAndBind(fmt.Sprintf("show vitess_migrations from %s like %%a", ksName),
sqltypes.StringBindVariable(uuid),
)
require.NoError(t, err)
Expand Down
45 changes: 39 additions & 6 deletions go/test/endtoend/transaction/twopc/stress/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"

Expand Down Expand Up @@ -102,12 +103,12 @@ func TestTwoPCFuzzTest(t *testing.T) {
timeForTesting: 5 * time.Second,
},
{
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL disruptions",
threads: 15,
updateSets: 15,
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables disruptions",
threads: 4,
updateSets: 4,
timeForTesting: 5 * time.Second,
clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer},
disruptionProbability: []int{5, 5, 5, 5, 5},
clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer},
disruptionProbability: []int{5, 5, 5, 5, 5, 5},
},
}

Expand Down Expand Up @@ -443,7 +444,39 @@ func onlineDDLFuzzer(t *testing.T) {
return
}
fmt.Println("Running online DDL with uuid: ", output)
WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
}

var moveTablesCount int

// moveTablesFuzzer runs a MoveTables workflow.
func moveTablesFuzzer(t *testing.T) {
workflow := "TestTwoPCFuzzTest"
srcKeyspace := keyspaceName
targetKeyspace := unshardedKeyspaceName
if moveTablesCount%2 == 1 {
srcKeyspace = unshardedKeyspaceName
targetKeyspace = keyspaceName
// We apply the vschema again because previous move tables would have removed the entry for `twopc_fuzzer_update`.
err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema)
require.NoError(t, err)
}
log.Errorf("MoveTables from - %v to %v", srcKeyspace, targetKeyspace)
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update")
// Initiate MoveTables for twopc_fuzzer_update.
output, err := mtw.Create()
if err != nil {
log.Errorf("error creating MoveTables - %v, output - %v", err, output)
return
}
moveTablesCount++
// Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows.
mtw.WaitForVreplCatchup(1 * time.Minute)
// SwitchTraffic
output, err = mtw.SwitchReadsAndWrites()
assert.NoError(t, err, output)
output, err = mtw.Complete()
assert.NoError(t, err, output)
}

func mysqlRestarts(t *testing.T) {
Expand Down
28 changes: 19 additions & 9 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
cell = "zone1"
hostname = "localhost"
sidecarDBName = "vt_ks"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
unshardedKeyspaceName = "uks"
cell = "zone1"
hostname = "localhost"

//go:embed schema.sql
SchemaSQL string
Expand Down Expand Up @@ -79,18 +79,28 @@ func TestMain(m *testing.M) {
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

// Start an unsharded keyspace
unshardedKeyspace := &cluster.Keyspace{
Name: unshardedKeyspaceName,
SchemaSQL: "",
VSchema: "{}",
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil {
return 1
}

// Start Vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = clusterInstance.GetVTParams(keyspaceName)
vtParams = clusterInstance.GetVTParams("")
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)

return m.Run()
Expand Down
87 changes: 79 additions & 8 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDisruptions(t *testing.T) {
disruptionName string
commitDelayTime string
disruption func(t *testing.T) error
resetFunc func(t *testing.T)
}{
{
disruptionName: "No Disruption",
Expand Down Expand Up @@ -76,6 +77,17 @@ func TestDisruptions(t *testing.T) {
commitDelayTime: "20",
disruption: onlineDDL,
},
{
disruptionName: "MoveTables - Complete",
commitDelayTime: "10",
disruption: moveTablesComplete,
resetFunc: moveTablesReset,
},
{
disruptionName: "MoveTables - Cancel",
commitDelayTime: "10",
disruption: moveTablesCancel,
},
{
disruptionName: "EmergencyReparentShard",
commitDelayTime: "5",
Expand Down Expand Up @@ -136,6 +148,10 @@ func TestDisruptions(t *testing.T) {
waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second)
writeCancel()
writerWg.Wait()

if tt.resetFunc != nil {
tt.resetFunc(t)
}
})
}
}
Expand Down Expand Up @@ -239,6 +255,61 @@ func mysqlRestartShard3(t *testing.T) error {
return syscallutil.Kill(pid, syscall.SIGKILL)
}

// moveTablesCancel runs a move tables command that we cancel in the end.
func moveTablesCancel(t *testing.T) error {
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
// Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows.
mtw.WaitForVreplCatchup(10 * time.Second)
// SwitchTraffic
output, err = mtw.SwitchReadsAndWrites()
require.NoError(t, err, output)
output, err = mtw.ReverseReadsAndWrites()
require.NoError(t, err, output)
output, err = mtw.Cancel()
require.NoError(t, err, output)
return nil
}

// moveTablesComplete runs a move tables command that we complete in the end.
func moveTablesComplete(t *testing.T) error {
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
// Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows.
mtw.WaitForVreplCatchup(10 * time.Second)
// SwitchTraffic
output, err = mtw.SwitchReadsAndWrites()
require.NoError(t, err, output)
output, err = mtw.Complete()
require.NoError(t, err, output)
return nil
}

// moveTablesReset moves the table back from the unsharded keyspace to sharded
func moveTablesReset(t *testing.T) {
// We apply the vschema again because previous move tables would have removed the entry for `twopc_t1`.
err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema)
require.NoError(t, err)
workflow := "TestDisruptions"
mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1")
// Initiate MoveTables for twopc_t1.
output, err := mtw.Create()
require.NoError(t, err, output)
// Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows.
mtw.WaitForVreplCatchup(10 * time.Second)
// SwitchTraffic
output, err = mtw.SwitchReadsAndWrites()
require.NoError(t, err, output)
output, err = mtw.Complete()
require.NoError(t, err, output)
}

var orderedDDL = []string{
"alter table twopc_t1 add column extra_col1 varchar(20)",
"alter table twopc_t1 add column extra_col2 varchar(20)",
Expand All @@ -256,18 +327,18 @@ func onlineDDL(t *testing.T) error {
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
}
query := fmt.Sprintf("show vitess_migrations like '%s'", uuid)
query := fmt.Sprintf("show vitess_migrations from %s like '%s'", keyspaceName, uuid)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
Expand All @@ -280,6 +351,11 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c

lastKnownStatus := ""
for {
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
countMatchedShards := 0
conn, err := mysql.Connect(ctx, vtParams)
if err != nil {
Expand All @@ -304,10 +380,5 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
}

0 comments on commit b2ec6a5

Please sign in to comment.