Skip to content

Commit

Permalink
Update e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Apr 12, 2024
1 parent 7063a15 commit 1f81b8a
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 67 deletions.
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(),
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
Expand Down
117 changes: 68 additions & 49 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ package vreplication

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"os/exec"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -121,9 +122,10 @@ func getConnectionNoError(t *testing.T, hostname string, port int) *mysql.Conn {

func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
vtParams := mysql.ConnParams{
Host: hostname,
Port: port,
Uname: "vt_dba",
Host: hostname,
Port: port,
Uname: "vt_dba",
ConnectTimeoutMs: 1000,
}
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
Expand Down Expand Up @@ -803,48 +805,51 @@ func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
}

const (
loadTestBufferingWindowDurationStr = "30s"
loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
loadTestWaitForCancel = 30 * time.Second
loadTestWaitBetweenQueries = 2 * time.Millisecond
loadTestBufferingWindowDuration = 10 * time.Second
loadTestAvgWaitBetweenQueries = 500 * time.Microsecond
loadTestDefaultConnections = 100
)

type loadGenerator struct {
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
connections int
wg sync.WaitGroup
}

func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
return &loadGenerator{
t: t,
vc: vc,
t: t,
vc: vc,
connections: loadTestDefaultConnections,
}
}

func (lg *loadGenerator) stop() {
time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
time.Sleep(loadTestBufferingWindowDuration * 2) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
log.Infof("Canceling load")
lg.cancel()
time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
lg.wg.Wait()
}

func (lg *loadGenerator) start() {
t := lg.t
lg.ctx, lg.cancel = context.WithCancel(context.Background())
var connectionCount atomic.Int64

var id int64
log.Infof("startLoad: starting")
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
var totalQueries, successfulQueries int64
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
lg.wg.Add(1)
defer func() {

defer lg.wg.Done()
log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
}()
logOnce := true
for {
select {
case <-lg.ctx.Done():
Expand All @@ -853,42 +858,56 @@ func (lg *loadGenerator) start() {
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
require.Equal(t, int64(0), deniedErrors)
require.Equal(t, int64(0), otherErrors)
require.Equal(t, int64(0), reshardedErrors)
require.Equal(t, totalQueries, successfulQueries)
return
default:
go func() {
conn := vc.GetVTGateConn(t)
defer conn.Close()
atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, id, id)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
// set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if logOnce {
log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
logOnce = false
if int(connectionCount.Load()) < lg.connections {
connectionCount.Add(1)
lg.wg.Add(1)
go func() {
defer lg.wg.Done()
defer connectionCount.Add(-1)
conn := vc.GetVTGateConn(t)
defer conn.Close()
for {
select {
case <-lg.ctx.Done():
return
default:
}
atomic.AddInt64(&otherErrors, 1)
newID := atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, newID, newID)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
if debugMode {
t.Logf("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
// set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if debugMode {
t.Logf("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&otherErrors, 1)
}
} else {
atomic.AddInt64(&successfulQueries, 1)
}
time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64())))
}
time.Sleep(loadTestWaitBetweenQueries)
} else {
atomic.AddInt64(&successfulQueries, 1)
}
}()
time.Sleep(loadTestWaitBetweenQueries)
}()
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -33,8 +34,12 @@ func TestMoveTablesBuffering(t *testing.T) {
catchup(t, targetTab2, workflowName, "MoveTables")
vdiffSideBySide(t, ksWorkflow, "")
waitForLowLag(t, "customer", workflowName)
tstWorkflowSwitchReads(t, "", "")
tstWorkflowSwitchWrites(t)
for i := 0; i < 10; i++ {
tstWorkflowSwitchReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
tstWorkflowReverseReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
}
log.Infof("SwitchWrites done")
lg.stop()

Expand Down
6 changes: 6 additions & 0 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"testing"
"time"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

Expand Down Expand Up @@ -67,10 +68,12 @@ func testCancel(t *testing.T) {
mt.SwitchReadsAndWrites()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

mt.ReverseReadsAndWrites()
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

mt.Cancel()
checkDenyList(targetKeyspace, false)
Expand Down Expand Up @@ -123,6 +126,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTab80Dash, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mt.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
mt.Complete()

emptyGlobalRoutingRules := "{}\n"
Expand Down Expand Up @@ -246,6 +250,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {

// Switch all traffic for the shard
mt80Dash.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

// Confirm global routing rules -- everything should still be routed
// to the source side, customer, globally.
Expand Down Expand Up @@ -331,6 +336,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTabDash80, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mtDash80.SwitchReadsAndWrites()
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)

// Confirm global routing rules: everything should still be routed
// to the source side, customer, globally.
Expand Down
27 changes: 13 additions & 14 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *Executor) newExecute(
execPlan planExec, // used when there is a plan to execute
recResult txResult, // used when it's something simple like begin/commit/rollback/savepoint
) (err error) {
// 1: Prepare before planning and execution
// 1: Prepare before planning and execution.

// Start an implicit transaction if necessary.
err = e.startTxIfNecessary(ctx, safeSession)
Expand All @@ -79,7 +79,7 @@ func (e *Executor) newExecute(

query, comments := sqlparser.SplitMarginComments(sql)

// 2: Parse and Validate query
// 2: Parse and Validate query.
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
return err
Expand All @@ -106,28 +106,26 @@ func (e *Executor) newExecute(
// There is a race due to which the executor's vschema may not have been updated yet.
// Without a wait we fail non-deterministically since the previous vschema will not have
// the updated routing rules.
timeout := e.resolver.scatterConn.gateway.buffer.GetConfig().MinTimeBetweenFailovers / MaxBufferingRetries
timeout := e.resolver.scatterConn.gateway.buffer.GetConfig().MaxFailoverDuration / MaxBufferingRetries
if waitForNewerVSchema(ctx, e, lastVSchemaCreated, timeout) {
vs = e.VSchema()
lastVSchemaCreated = vs.GetCreated()
}
}
vs = e.VSchema() // We're going to replan either way, so let's use the current vschema.
}

vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)
if err != nil {
return err
}

// 3: Create a plan for the query
// 3: Create a plan for the query.
// If we are retrying, it is likely that the routing rules have changed and hence we need to
// replan the query since the target keyspace of the resolved shards may have changed as a
// result of MoveTables. So we cannot reuse the plan from the first try.
// When buffering ends, many queries might be getting planned at the same time. Ideally we
// should be able to reuse plans once the first drained query has been planned. For now, we
// punt on this and choose not to prematurely optimize since it is not clear how much caching
// will help and if it will result in hard-to-track edge cases.

// result of MoveTables SwitchTraffic which does a RebuildSrvVSchema which in turn causes
// the vtgate to clear the cached plans when processing the new serving vschema.
// When buffering ends, many queries might be getting planned at the same time and we then
// take full advatange of the cached plan.
plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats)
execStart := e.logPlanningFinished(logStats, plan)

Expand All @@ -140,7 +138,7 @@ func (e *Executor) newExecute(
safeSession.ClearWarnings()
}

// add any warnings that the planner wants to add
// Add any warnings that the planner wants to add.
for _, warning := range plan.Warnings {
safeSession.RecordWarning(warning)
}
Expand All @@ -153,14 +151,14 @@ func (e *Executor) newExecute(
return recResult(plan.Type, result)
}

// 4: Prepare for execution
// 4: Prepare for execution.
err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession)
if err != nil {
logStats.Error = err
return err
}

// 5: Execute the plan and retry if needed
// 5: Execute the plan.
if plan.Instructions.NeedsTransaction() {
err = e.insideTransaction(ctx, safeSession, logStats,
func() error {
Expand All @@ -174,6 +172,7 @@ func (e *Executor) newExecute(
return err
}

// 6: Retry if needed.
rootCause := vterrors.RootCause(err)
if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") {
log.V(2).Infof("Retry: %d, will retry query %s due to %v", try, query, err)
Expand Down

0 comments on commit 1f81b8a

Please sign in to comment.