Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve query buffering behavior during MoveTables traffic switching #15701

Merged
merged 14 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- **[Flag changes](#flag-changes)**
- [`pprof-http` default change](#pprof-http-default)
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
- [New minimum for `--buffer_min_time_between_failovers`](#buffer_min_time_between_failovers-flag)
- [New `track-udfs` vtgate flag](#vtgate-track-udfs-flag)
- **[Minor Changes](#minor-changes)**
- **[New Stats](#new-stats)**
Expand Down Expand Up @@ -214,6 +215,10 @@ To continue enabling these endpoints, explicitly set `--pprof-http` when startin

The new `--healthcheck-dial-concurrency` flag defines the maximum number of healthcheck connections that can open concurrently. This limit is to avoid hitting Go runtime panics on deployments watching enough tablets [to hit the runtime's maximum thread limit of `10000`](https://pkg.go.dev/runtime/debug#SetMaxThreads) due to blocking network syscalls. This flag applies to `vtcombo`, `vtctld` and `vtgate` only and a value less than the runtime max thread limit _(`10000`)_ is recommended.

#### <a id="buffer_min_time_between_failovers-flag"/>New minimum for `--buffer_min_time_between_failovers`

The `--buffer_min_time_between_failovers` `vttablet` flag now has a minimum value of `1s`. This is because a value of 0 can cause issues with the buffering mechanics resulting in unexpected and unnecessary query errors — in particular during `MoveTables SwitchTraffic` operations. If you are currently specifying a value of 0 for this flag then you will need to update the config value to 1s *prior to upgrading to v20 or later* as `vttablet` will report an error and terminate if you attempt to start it with a value of 0.

#### <a id="vtgate-track-udfs-flag"/>New `--track-udfs` vtgate flag

The new `--track-udfs` flag enables VTGate to track user defined functions for better planning.
Expand Down
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(),
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
Expand Down
127 changes: 74 additions & 53 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ package vreplication

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"os/exec"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -121,9 +122,10 @@ func getConnectionNoError(t *testing.T, hostname string, port int) *mysql.Conn {

func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
vtParams := mysql.ConnParams{
Host: hostname,
Port: port,
Uname: "vt_dba",
Host: hostname,
Port: port,
Uname: "vt_dba",
ConnectTimeoutMs: 1000,
}
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
Expand Down Expand Up @@ -803,92 +805,111 @@ func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
}

const (
loadTestBufferingWindowDurationStr = "30s"
loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
loadTestWaitForCancel = 30 * time.Second
loadTestWaitBetweenQueries = 2 * time.Millisecond
loadTestBufferingWindowDuration = 10 * time.Second
loadTestAvgWaitBetweenQueries = 500 * time.Microsecond
loadTestDefaultConnections = 100
)

type loadGenerator struct {
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
connections int
wg sync.WaitGroup
}

func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
return &loadGenerator{
t: t,
vc: vc,
t: t,
vc: vc,
connections: loadTestDefaultConnections,
}
}

func (lg *loadGenerator) stop() {
time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
// Wait for buffering to stop and additional records to be inserted by start
// after traffic is switched.
time.Sleep(loadTestBufferingWindowDuration * 2)
log.Infof("Canceling load")
lg.cancel()
time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
lg.wg.Wait()
}

func (lg *loadGenerator) start() {
t := lg.t
lg.ctx, lg.cancel = context.WithCancel(context.Background())
var connectionCount atomic.Int64

var id int64
log.Infof("startLoad: starting")
log.Infof("loadGenerator: starting")
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
var totalQueries, successfulQueries int64
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
lg.wg.Add(1)
defer func() {

log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
defer lg.wg.Done()
log.Infof("loadGenerator: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
}()
logOnce := true
for {
select {
case <-lg.ctx.Done():
log.Infof("startLoad: context cancelled")
log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
log.Infof("loadGenerator: context cancelled")
log.Infof("loadGenerator: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
require.Equal(t, int64(0), deniedErrors)
require.Equal(t, int64(0), otherErrors)
require.Equal(t, int64(0), reshardedErrors)
require.Equal(t, totalQueries, successfulQueries)
return
default:
go func() {
conn := vc.GetVTGateConn(t)
defer conn.Close()
atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, id, id)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
// set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if logOnce {
log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
logOnce = false
if int(connectionCount.Load()) < lg.connections {
connectionCount.Add(1)
lg.wg.Add(1)
go func() {
defer lg.wg.Done()
defer connectionCount.Add(-1)
conn := vc.GetVTGateConn(t)
defer conn.Close()
for {
select {
case <-lg.ctx.Done():
return
default:
}
atomic.AddInt64(&otherErrors, 1)
newID := atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, newID, newID)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
if debugMode {
t.Logf("loadGenerator: denied tables error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// This can happen when a second keyspace is setup with the same tables, but
// there are no routing rules set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if debugMode {
t.Logf("loadGenerator: error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&otherErrors, 1)
}
} else {
atomic.AddInt64(&successfulQueries, 1)
}
time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64())))
}
time.Sleep(loadTestWaitBetweenQueries)
} else {
atomic.AddInt64(&successfulQueries, 1)
}
}()
time.Sleep(loadTestWaitBetweenQueries)
}()
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -33,8 +34,12 @@ func TestMoveTablesBuffering(t *testing.T) {
catchup(t, targetTab2, workflowName, "MoveTables")
vdiffSideBySide(t, ksWorkflow, "")
waitForLowLag(t, "customer", workflowName)
tstWorkflowSwitchReads(t, "", "")
tstWorkflowSwitchWrites(t)
for i := 0; i < 10; i++ {
tstWorkflowSwitchReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
tstWorkflowReverseReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
}
log.Infof("SwitchWrites done")
lg.stop()

Expand Down
6 changes: 6 additions & 0 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"testing"
"time"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

Expand Down Expand Up @@ -67,10 +68,12 @@ func testCancel(t *testing.T) {
mt.SwitchReadsAndWrites()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

mt.ReverseReadsAndWrites()
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

mt.Cancel()
checkDenyList(targetKeyspace, false)
Expand Down Expand Up @@ -123,6 +126,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTab80Dash, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mt.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
mt.Complete()

emptyGlobalRoutingRules := "{}\n"
Expand Down Expand Up @@ -246,6 +250,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {

// Switch all traffic for the shard
mt80Dash.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

// Confirm global routing rules -- everything should still be routed
// to the source side, customer, globally.
Expand Down Expand Up @@ -331,6 +336,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTabDash80, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mtDash80.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

// Confirm global routing rules: everything should still be routed
// to the source side, customer, globally.
Expand Down
Loading
Loading