From 1f81b8a57273710a02e9ce460f72ebf3abf3bd92 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 11 Apr 2024 18:12:14 -0400 Subject: [PATCH] Update e2e test Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/cluster_test.go | 5 +- go/test/endtoend/vreplication/helper_test.go | 117 ++++++++++-------- .../vreplication/movetables_buffering_test.go | 9 +- .../vreplication/partial_movetables_test.go | 6 + go/vt/vtgate/plan_execute.go | 27 ++-- 5 files changed, 97 insertions(+), 67 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 13aab15a18c..ce8c0bc916c 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -56,8 +56,9 @@ var ( sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName)) mainClusterConfig *ClusterConfig externalClusterConfig *ClusterConfig - extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr, - "--buffer_size", "100000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr} + extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(), + "--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(), + "--buffer_drain_concurrency", "10"} extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} // This variable can be used within specific tests to alter vttablet behavior extraVTTabletArgs = []string{} diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index b5310822d0c..de534c5527a 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -18,17 +18,18 @@ package vreplication import ( "context" - "crypto/rand" "encoding/hex" "encoding/json" "fmt" "io" + "math/rand" "net/http" "os" "os/exec" "regexp" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -121,9 +122,10 @@ func getConnectionNoError(t *testing.T, hostname string, port int) *mysql.Conn { func getConnection(t *testing.T, hostname string, port int) *mysql.Conn { vtParams := mysql.ConnParams{ - Host: hostname, - Port: port, - Uname: "vt_dba", + Host: hostname, + Port: port, + Uname: "vt_dba", + ConnectTimeoutMs: 1000, } ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) @@ -803,48 +805,51 @@ func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int { } const ( - loadTestBufferingWindowDurationStr = "30s" - loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr - loadTestWaitForCancel = 30 * time.Second - loadTestWaitBetweenQueries = 2 * time.Millisecond + loadTestBufferingWindowDuration = 10 * time.Second + loadTestAvgWaitBetweenQueries = 500 * time.Microsecond + loadTestDefaultConnections = 100 ) type loadGenerator struct { - t *testing.T - vc *VitessCluster - ctx context.Context - cancel context.CancelFunc + t *testing.T + vc *VitessCluster + ctx context.Context + cancel context.CancelFunc + connections int + wg sync.WaitGroup } func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator { return &loadGenerator{ - t: t, - vc: vc, + t: t, + vc: vc, + connections: loadTestDefaultConnections, } } func (lg *loadGenerator) stop() { - time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched + time.Sleep(loadTestBufferingWindowDuration * 2) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched log.Infof("Canceling load") lg.cancel() - time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect + lg.wg.Wait() } func (lg *loadGenerator) start() { t := lg.t lg.ctx, lg.cancel = context.WithCancel(context.Background()) + var connectionCount atomic.Int64 var id int64 log.Infof("startLoad: starting") queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')" var totalQueries, successfulQueries int64 var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64 + lg.wg.Add(1) defer func() { - + defer lg.wg.Done() log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) }() - logOnce := true for { select { case <-lg.ctx.Done(): @@ -853,42 +858,56 @@ func (lg *loadGenerator) start() { deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) require.Equal(t, int64(0), deniedErrors) require.Equal(t, int64(0), otherErrors) + require.Equal(t, int64(0), reshardedErrors) require.Equal(t, totalQueries, successfulQueries) return default: - go func() { - conn := vc.GetVTGateConn(t) - defer conn.Close() - atomic.AddInt64(&id, 1) - query := fmt.Sprintf(queryTemplate, id, id) - _, err := conn.ExecuteFetch(query, 1, false) - atomic.AddInt64(&totalQueries, 1) - if err != nil { - sqlErr := err.(*sqlerror.SQLError) - if strings.Contains(strings.ToLower(err.Error()), "denied tables") { - log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err) - atomic.AddInt64(&deniedErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { - // this can happen when a second keyspace is setup with the same tables, but there are no routing rules - // set yet by MoveTables. So we ignore these errors. - atomic.AddInt64(&ambiguousErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { - atomic.AddInt64(&reshardedErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "not found") { - atomic.AddInt64(&tableNotFoundErrors, 1) - } else { - if logOnce { - log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err) - logOnce = false + if int(connectionCount.Load()) < lg.connections { + connectionCount.Add(1) + lg.wg.Add(1) + go func() { + defer lg.wg.Done() + defer connectionCount.Add(-1) + conn := vc.GetVTGateConn(t) + defer conn.Close() + for { + select { + case <-lg.ctx.Done(): + return + default: } - atomic.AddInt64(&otherErrors, 1) + newID := atomic.AddInt64(&id, 1) + query := fmt.Sprintf(queryTemplate, newID, newID) + _, err := conn.ExecuteFetch(query, 1, false) + atomic.AddInt64(&totalQueries, 1) + if err != nil { + sqlErr := err.(*sqlerror.SQLError) + if strings.Contains(strings.ToLower(err.Error()), "denied tables") { + if debugMode { + t.Logf("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err) + } + atomic.AddInt64(&deniedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { + // this can happen when a second keyspace is setup with the same tables, but there are no routing rules + // set yet by MoveTables. So we ignore these errors. + atomic.AddInt64(&ambiguousErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { + atomic.AddInt64(&reshardedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "not found") { + atomic.AddInt64(&tableNotFoundErrors, 1) + } else { + if debugMode { + t.Logf("startLoad: error executing query: %d:%v", sqlErr.Number(), err) + } + atomic.AddInt64(&otherErrors, 1) + } + } else { + atomic.AddInt64(&successfulQueries, 1) + } + time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64()))) } - time.Sleep(loadTestWaitBetweenQueries) - } else { - atomic.AddInt64(&successfulQueries, 1) - } - }() - time.Sleep(loadTestWaitBetweenQueries) + }() + } } } } diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index a977320ec4a..f456c32bfd5 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -2,6 +2,7 @@ package vreplication import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -33,8 +34,12 @@ func TestMoveTablesBuffering(t *testing.T) { catchup(t, targetTab2, workflowName, "MoveTables") vdiffSideBySide(t, ksWorkflow, "") waitForLowLag(t, "customer", workflowName) - tstWorkflowSwitchReads(t, "", "") - tstWorkflowSwitchWrites(t) + for i := 0; i < 10; i++ { + tstWorkflowSwitchReadsAndWrites(t) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) + tstWorkflowReverseReadsAndWrites(t) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) + } log.Infof("SwitchWrites done") lg.stop() diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 2f0c7c71d29..4236bff95a3 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "testing" + "time" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -67,10 +68,12 @@ func testCancel(t *testing.T) { mt.SwitchReadsAndWrites() checkDenyList(targetKeyspace, false) checkDenyList(sourceKeyspace, true) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) mt.ReverseReadsAndWrites() checkDenyList(targetKeyspace, true) checkDenyList(sourceKeyspace, false) + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) mt.Cancel() checkDenyList(targetKeyspace, false) @@ -123,6 +126,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { catchup(t, targetTab80Dash, workflowName, "MoveTables") vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil) mt.SwitchReadsAndWrites() + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) mt.Complete() emptyGlobalRoutingRules := "{}\n" @@ -246,6 +250,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { // Switch all traffic for the shard mt80Dash.SwitchReadsAndWrites() + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) // Confirm global routing rules -- everything should still be routed // to the source side, customer, globally. @@ -331,6 +336,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { catchup(t, targetTabDash80, workflowName, "MoveTables") vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil) mtDash80.SwitchReadsAndWrites() + time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) // Confirm global routing rules: everything should still be routed // to the source side, customer, globally. diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 8a43d0e8568..4a49fd40ded 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -65,7 +65,7 @@ func (e *Executor) newExecute( execPlan planExec, // used when there is a plan to execute recResult txResult, // used when it's something simple like begin/commit/rollback/savepoint ) (err error) { - // 1: Prepare before planning and execution + // 1: Prepare before planning and execution. // Start an implicit transaction if necessary. err = e.startTxIfNecessary(ctx, safeSession) @@ -79,7 +79,7 @@ func (e *Executor) newExecute( query, comments := sqlparser.SplitMarginComments(sql) - // 2: Parse and Validate query + // 2: Parse and Validate query. stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser()) if err != nil { return err @@ -106,12 +106,12 @@ func (e *Executor) newExecute( // There is a race due to which the executor's vschema may not have been updated yet. // Without a wait we fail non-deterministically since the previous vschema will not have // the updated routing rules. - timeout := e.resolver.scatterConn.gateway.buffer.GetConfig().MinTimeBetweenFailovers / MaxBufferingRetries + timeout := e.resolver.scatterConn.gateway.buffer.GetConfig().MaxFailoverDuration / MaxBufferingRetries if waitForNewerVSchema(ctx, e, lastVSchemaCreated, timeout) { + vs = e.VSchema() lastVSchemaCreated = vs.GetCreated() } } - vs = e.VSchema() // We're going to replan either way, so let's use the current vschema. } vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) @@ -119,15 +119,13 @@ func (e *Executor) newExecute( return err } - // 3: Create a plan for the query + // 3: Create a plan for the query. // If we are retrying, it is likely that the routing rules have changed and hence we need to // replan the query since the target keyspace of the resolved shards may have changed as a - // result of MoveTables. So we cannot reuse the plan from the first try. - // When buffering ends, many queries might be getting planned at the same time. Ideally we - // should be able to reuse plans once the first drained query has been planned. For now, we - // punt on this and choose not to prematurely optimize since it is not clear how much caching - // will help and if it will result in hard-to-track edge cases. - + // result of MoveTables SwitchTraffic which does a RebuildSrvVSchema which in turn causes + // the vtgate to clear the cached plans when processing the new serving vschema. + // When buffering ends, many queries might be getting planned at the same time and we then + // take full advatange of the cached plan. plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) execStart := e.logPlanningFinished(logStats, plan) @@ -140,7 +138,7 @@ func (e *Executor) newExecute( safeSession.ClearWarnings() } - // add any warnings that the planner wants to add + // Add any warnings that the planner wants to add. for _, warning := range plan.Warnings { safeSession.RecordWarning(warning) } @@ -153,14 +151,14 @@ func (e *Executor) newExecute( return recResult(plan.Type, result) } - // 4: Prepare for execution + // 4: Prepare for execution. err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession) if err != nil { logStats.Error = err return err } - // 5: Execute the plan and retry if needed + // 5: Execute the plan. if plan.Instructions.NeedsTransaction() { err = e.insideTransaction(ctx, safeSession, logStats, func() error { @@ -174,6 +172,7 @@ func (e *Executor) newExecute( return err } + // 6: Retry if needed. rootCause := vterrors.RootCause(err) if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") { log.V(2).Infof("Retry: %d, will retry query %s due to %v", try, query, err)