Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve query buffering behavior during MoveTables traffic switching #15701

Merged
merged 14 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(),
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
Expand Down
127 changes: 74 additions & 53 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ package vreplication

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"os/exec"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -121,9 +122,10 @@ func getConnectionNoError(t *testing.T, hostname string, port int) *mysql.Conn {

func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
vtParams := mysql.ConnParams{
Host: hostname,
Port: port,
Uname: "vt_dba",
Host: hostname,
Port: port,
Uname: "vt_dba",
ConnectTimeoutMs: 1000,
}
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
Expand Down Expand Up @@ -803,92 +805,111 @@ func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
}

const (
loadTestBufferingWindowDurationStr = "30s"
loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
loadTestWaitForCancel = 30 * time.Second
loadTestWaitBetweenQueries = 2 * time.Millisecond
loadTestBufferingWindowDuration = 10 * time.Second
loadTestAvgWaitBetweenQueries = 500 * time.Microsecond
loadTestDefaultConnections = 100
)

type loadGenerator struct {
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
connections int
wg sync.WaitGroup
}

func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
return &loadGenerator{
t: t,
vc: vc,
t: t,
vc: vc,
connections: loadTestDefaultConnections,
}
}

func (lg *loadGenerator) stop() {
time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
// Wait for buffering to stop and additional records to be inserted by start
// after traffic is switched.
time.Sleep(loadTestBufferingWindowDuration * 2)
log.Infof("Canceling load")
lg.cancel()
time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
lg.wg.Wait()
}

func (lg *loadGenerator) start() {
t := lg.t
lg.ctx, lg.cancel = context.WithCancel(context.Background())
var connectionCount atomic.Int64

var id int64
log.Infof("startLoad: starting")
log.Infof("loadGenerator: starting")
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
var totalQueries, successfulQueries int64
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
lg.wg.Add(1)
defer func() {

log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
defer lg.wg.Done()
log.Infof("loadGenerator: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
}()
logOnce := true
for {
select {
case <-lg.ctx.Done():
log.Infof("startLoad: context cancelled")
log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
log.Infof("loadGenerator: context cancelled")
log.Infof("loadGenerator: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
require.Equal(t, int64(0), deniedErrors)
require.Equal(t, int64(0), otherErrors)
require.Equal(t, int64(0), reshardedErrors)
require.Equal(t, totalQueries, successfulQueries)
return
default:
go func() {
conn := vc.GetVTGateConn(t)
defer conn.Close()
atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, id, id)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
// set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if logOnce {
log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
logOnce = false
if int(connectionCount.Load()) < lg.connections {
connectionCount.Add(1)
lg.wg.Add(1)
go func() {
defer lg.wg.Done()
defer connectionCount.Add(-1)
conn := vc.GetVTGateConn(t)
defer conn.Close()
for {
select {
case <-lg.ctx.Done():
return
default:
}
atomic.AddInt64(&otherErrors, 1)
newID := atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, newID, newID)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
if debugMode {
t.Logf("loadGenerator: denied tables error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// This can happen when a second keyspace is setup with the same tables, but
// there are no routing rules set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if debugMode {
t.Logf("loadGenerator: error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&otherErrors, 1)
}
} else {
atomic.AddInt64(&successfulQueries, 1)
}
time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64())))
}
time.Sleep(loadTestWaitBetweenQueries)
} else {
atomic.AddInt64(&successfulQueries, 1)
}
}()
time.Sleep(loadTestWaitBetweenQueries)
}()
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -33,8 +34,12 @@ func TestMoveTablesBuffering(t *testing.T) {
catchup(t, targetTab2, workflowName, "MoveTables")
vdiffSideBySide(t, ksWorkflow, "")
waitForLowLag(t, "customer", workflowName)
tstWorkflowSwitchReads(t, "", "")
tstWorkflowSwitchWrites(t)
for i := 0; i < 10; i++ {
tstWorkflowSwitchReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
tstWorkflowReverseReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
}
log.Infof("SwitchWrites done")
lg.stop()

Expand Down
6 changes: 6 additions & 0 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"testing"
"time"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

Expand Down Expand Up @@ -67,10 +68,12 @@ func testCancel(t *testing.T) {
mt.SwitchReadsAndWrites()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

mt.ReverseReadsAndWrites()
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

mt.Cancel()
checkDenyList(targetKeyspace, false)
Expand Down Expand Up @@ -123,6 +126,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTab80Dash, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mt.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
mt.Complete()

emptyGlobalRoutingRules := "{}\n"
Expand Down Expand Up @@ -246,6 +250,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {

// Switch all traffic for the shard
mt80Dash.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

// Confirm global routing rules -- everything should still be routed
// to the source side, customer, globally.
Expand Down Expand Up @@ -331,6 +336,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTabDash80, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mtDash80.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

// Confirm global routing rules: everything should still be routed
// to the source side, customer, globally.
Expand Down
Loading
Loading