From c666b1442b216413bd36f31926489cc0581af511 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 May 2024 12:47:12 -0400 Subject: [PATCH] Remove stallHandler related code Signed-off-by: Matt Lord --- go/flags/endtoend/vtcombo.txt | 1 - go/flags/endtoend/vttablet.txt | 1 - .../onlineddl_vrepl_stress_suite_test.go | 1 - go/test/endtoend/vreplication/cluster_test.go | 4 +- go/test/utils/noleak.go | 7 - .../tabletmanager/vreplication/flags.go | 3 - .../tabletmanager/vreplication/vplayer.go | 115 +--------------- .../vreplication/vplayer_flaky_test.go | 32 +---- .../vreplication/vplayer_test.go | 129 ------------------ tools/unit_test_runner.sh | 2 +- 10 files changed, 6 insertions(+), 289 deletions(-) delete mode 100644 go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 3f8862542b5..fd09f940b76 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -402,7 +402,6 @@ Flags: --v Level log level for V logs -v, --version print binary version --vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging - --vplayer-progress-deadline duration At what point, without having been able to successfully replicate a pending batch of events, should we consider the vplayer stalled; producing an error and log message and restarting the player. --vreplication-parallel-insert-workers int Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase. (default 1) --vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s) --vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 2bb04b88c3d..38b30f46ffa 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -402,7 +402,6 @@ Flags: --v Level log level for V logs -v, --version print binary version --vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging - --vplayer-progress-deadline duration At what point, without having been able to successfully replicate a pending batch of events, should we consider the vplayer stalled; producing an error and log message and restarting the player. --vreplication-parallel-insert-workers int Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase. (default 1) --vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s) --vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000) diff --git a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go index d15494c944a..a3fa676d40b 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go @@ -439,7 +439,6 @@ func TestMain(m *testing.M) { // Test VPlayer batching mode. fmt.Sprintf("--vreplication_experimental_flags=%d", vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching), - "--vplayer-progress-deadline", "15s", } clusterInstance.VtGateExtraArgs = []string{ "--ddl_strategy", "online", diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 13a5381cc80..ddd323f7d3f 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -61,9 +61,7 @@ var ( "--buffer_drain_concurrency", "10"} extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} // This variable can be used within specific tests to alter vttablet behavior - extraVTTabletArgs = []string{ - "--vplayer-progress-deadline", "15s", - } + extraVTTabletArgs = []string{} parallelInsertWorkers = "--vreplication-parallel-insert-workers=4" diff --git a/go/test/utils/noleak.go b/go/test/utils/noleak.go index b14d71140f8..31d454ec789 100644 --- a/go/test/utils/noleak.go +++ b/go/test/utils/noleak.go @@ -81,13 +81,6 @@ func ensureNoGoroutines() error { goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"), goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"), goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/pools/smartconnpool.(*ConnPool[...]).runWorker.func1"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/stats.(*Rates).track"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/timer.(*Timer).run"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/srvtopo.NewResilientServer.NewSrvVSchemaWatcher.func1"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/srvtopo.NewResilientServer.NewSrvVSchemaWatcher.func2"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/topo/memorytopo.(*Conn).Watch.func1"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/topo.(*Server).WatchSrvVSchema.func1"), goleak.IgnoreTopFunction("testing.tRunner.func1"), } diff --git a/go/vt/vttablet/tabletmanager/vreplication/flags.go b/go/vt/vttablet/tabletmanager/vreplication/flags.go index 9a9863c90ca..e45158ab99a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/flags.go +++ b/go/vt/vttablet/tabletmanager/vreplication/flags.go @@ -59,9 +59,6 @@ func registerVReplicationFlags(fs *pflag.FlagSet) { fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table") fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.") - - // At what point should we consider a vplayer to be stuck, produce an error, and retry? - fs.DurationVar(&vplayerProgressDeadline, "vplayer-progress-deadline", vplayerProgressDeadline, "At what point, without having been able to successfully replicate a pending batch of events, should we consider the vplayer stalled; producing an error and log message and restarting the player.") } func init() { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 48fe441318d..2b7a48255a2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -24,7 +24,6 @@ import ( "math" "strconv" "strings" - "sync/atomic" "time" "vitess.io/vitess/go/mysql/replication" @@ -39,9 +38,6 @@ import ( ) var ( - // At what point should we consider the vplayer to be stalled and return an error. - vplayerProgressDeadline = time.Duration(0) // Disabled by default. - // The error to return when we have detected a stall in the vplayer. ErrVPlayerStalled = fmt.Errorf("progress stalled; vplayer was unable to replicate the transaction in a timely manner; examine the target mysqld instance health and the replicated queries' EXPLAIN output to see why queries are taking unusually long") ) @@ -96,11 +92,6 @@ type vplayer struct { // foreignKeyChecksStateInitialized is set to true once we have initialized the foreignKeyChecksEnabled. // The initialization is done on the first row event that this vplayer sees. foreignKeyChecksStateInitialized bool - - // stallHandler is used to detect stalls when applying replicated user - // transactions and break out of the stall with a meaningful user error - // and log message, allowing for another retry attempt. - stallHandler *stallHandler } // NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event. @@ -131,15 +122,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) } - stallHandler := newStallHandler(vplayerProgressDeadline, nil) commitFunc := func() error { - // Explicit commits are only done when we are processing a batch of replicated - // queries and NOT for heartbeats or when simply updating the position. So we - // stop the timer here. - if err := vr.dbClient.Commit(); err != nil { - return err - } - return stallHandler.stopTimer() + return vr.dbClient.Commit() } batchMode := false if vttablet.VReplicationExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { @@ -170,13 +154,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch } commitFunc = func() error { - // Explicit commits are only done when we are processing a batch of replicated - // queries and NOT for heartbeats or when simply updating the position. So we - // stop timer here. - if err := vr.dbClient.CommitTrxQueryBatch(); err != nil { // Commit the current trx batch - return err - } - return stallHandler.stopTimer() + return vr.dbClient.CommitTrxQueryBatch() } vr.dbClient.maxBatchSize = maxAllowedPacket } @@ -195,7 +173,6 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map query: queryFunc, commit: commitFunc, batchMode: batchMode, - stallHandler: stallHandler, } } @@ -295,8 +272,6 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) { }() applyErr := make(chan error, 1) - vp.stallHandler.fire = applyErr - defer vp.stallHandler.stopTimer() go func() { applyErr <- vp.applyEvents(ctx, relay) }() @@ -631,9 +606,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.vr.dbClient.Begin(); err != nil { return err } - if err := vp.stallHandler.startTimer(); err != nil { - return err - } } if !vp.vr.dbClient.InTransaction { @@ -655,9 +627,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.vr.dbClient.Begin(); err != nil { return err } - if err := vp.stallHandler.startTimer(); err != nil { - return err - } tplan, err := vp.replicatorPlan.buildExecutionPlan(event.FieldEvent) if err != nil { return err @@ -678,9 +647,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.vr.dbClient.Begin(); err != nil { return err } - if err := vp.stallHandler.startTimer(); err != nil { - return err - } if err := vp.applyStmtEvent(ctx, event); err != nil { return err } @@ -691,9 +657,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.vr.dbClient.Begin(); err != nil { return err } - if err := vp.stallHandler.startTimer(); err != nil { - return err - } if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { log.Infof("Error applying row event: %s", err.Error()) return err @@ -838,77 +801,3 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return nil } - -// stallHandler is used to monitor for vplayer stalls and trigger an error -// when detected. This is used today to detect when a vplayer is not able to -// commit/complete a replicated user transaction within a configured period of -// time. It offers a lock-free implementation that is idempotent; you can call -// startTimer()/stopTimer() as many times as you like and in any order -- it -// will ensure that there's only ever 0 or 1 timers/goroutines running at a -// time. When it is already running, calls to startTimer() will only reset -// the timer's deadline. -type stallHandler struct { - timer atomic.Pointer[time.Timer] - deadline time.Duration - fire chan error - stop chan struct{} -} - -// newStallHandler initializes a stall handler. You should call stopTimer() -// in a defer from the same function where you initalize a new stallHandler. -func newStallHandler(dl time.Duration, ch chan error) *stallHandler { - return &stallHandler{ - deadline: dl, - fire: ch, - stop: make(chan struct{}), - } -} - -// startTimer starts the timer if it's not already running and it otherwise -// resets the timer's deadline when it it is already running. -func (sh *stallHandler) startTimer() error { - if sh == nil || sh.deadline == 0 { // Stall handling is disabled - return nil - } - // If the timer has not been initialized yet, then do so. - if swapped := sh.timer.CompareAndSwap(nil, time.NewTimer(sh.deadline)); !swapped { - // Otherwise, reset the timer's deadline. - if sh.timer.Load().Reset(sh.deadline) { - // The timer was already running, so be sure the channel is drained. - select { - case <-sh.timer.Load().C: - default: - } - // The timer goroutine was already running, so now that we've reset the - // timer's deadline we're done. - return nil - } - } - go func() { - select { - case <-sh.timer.Load().C: // The timer expired - sh.fire <- vterrors.Wrapf(ErrVPlayerStalled, - "failed to commit transaction batch before the configured --vplayer-progress-deadline of %v", vplayerProgressDeadline) - case <-sh.stop: // The timer was stopped - } - }() - return nil -} - -// stopTimer stops the timer if it's currently running. -func (sh *stallHandler) stopTimer() error { - if sh == nil || sh.deadline == 0 || sh.timer.Load() == nil { // Stall handling is currently disabled - return nil - } - if sh.timer.Load().Stop() { - // It was running, so signal the goroutine to stop. - sh.stop <- struct{}{} - return nil - } - // It wasn't running, so be sure that the channel is drained. - select { - case <-sh.timer.Load().C: - default: - } - return nil -} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index e4a9cff6398..3d3c2b3594a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -3330,19 +3330,15 @@ func TestPlayerStalls(t *testing.T) { log.Errorf = logger.Errorf ovmhu := vreplicationMinimumHeartbeatUpdateInterval - ogvpt := vplayerProgressDeadline orlmi := relayLogMaxItems ord := retryDelay defer func() { log.Errorf = ole vreplicationMinimumHeartbeatUpdateInterval = ovmhu - vplayerProgressDeadline = ogvpt relayLogMaxItems = orlmi retryDelay = ord }() - // Shorten the deadline for the test. - vplayerProgressDeadline = 5 * time.Second // Shorten the time for a required heartbeat recording for the test. vreplicationMinimumHeartbeatUpdateInterval = 5 // So each relay log batch will be a single statement transaction. @@ -3355,7 +3351,7 @@ func TestPlayerStalls(t *testing.T) { // A channel to communicate across goroutines. done := make(chan struct{}) - testTimeout := vplayerProgressDeadline * 10 + testTimeout := time.Duration(int64(vreplicationMinimumHeartbeatUpdateInterval) * int64(10*time.Second)) execStatements(t, []string{ "create table t1(id bigint, val1 varchar(1000), primary key(id))", @@ -3382,34 +3378,10 @@ func TestPlayerStalls(t *testing.T) { postFunc func() expectQueries bool }{ - { - name: "stall in vplayer with statements", - input: []string{ - "set @@session.binlog_format='STATEMENT'", // As we are using the sleep function in the query to simulate a stall - "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", // This should be the only query that gets replicated - // This will cause a stall in the vplayer. - fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressDeadline.Seconds()+5)), - }, - expectQueries: true, - output: qh.Expect( - "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", - // This will cause a stall to be detected in the vplayer. This is - // what we want in the end, our improved error message (which also - // gets logged). - fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressDeadline.Seconds()+5)), - "/update _vt.vreplication set message=.*progress stalled.*", - ), - postFunc: func() { - time.Sleep(vplayerProgressDeadline) - log.Flush() - require.Contains(t, logger.String(), "failed to commit transaction batch before the configured", "expected log message not found") - execStatements(t, []string{"set @@session.binlog_format='ROW'"}) - }, - }, { name: "stall in vplayer with rows", input: []string{ - fmt.Sprintf("set @@session.innodb_lock_wait_timeout=%d", int64(vplayerProgressDeadline.Seconds()+5)), + fmt.Sprintf("set @@session.innodb_lock_wait_timeout=%d", vreplicationMinimumHeartbeatUpdateInterval+5), "insert into t1(id, val1) values (10, 'mmm'), (11, 'nnn'), (12, 'ooo')", "update t1 set val1 = 'yyy' where id = 10", }, diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go deleted file mode 100644 index 746d2b85630..00000000000 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ /dev/null @@ -1,129 +0,0 @@ -/* -Copyright 2024 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package vreplication - -import ( - "context" - "math/rand" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/test/utils" -) - -func TestStallHandler(t *testing.T) { - lctx := utils.LeakCheckContext(t) - tme := time.Duration(90 * time.Second) - ctx, cancel := context.WithTimeout(lctx, tme) - defer cancel() - - concurrency := 10000 - dl := time.Duration(10 * time.Second) - - tests := []struct { - name string - f func() - }{ - { - name: "Random concurrency", - f: func() { - ch := make(chan error) - sh := newStallHandler(dl, ch) - defer sh.stopTimer() // This should always be called in a defer from where it's created - wg := sync.WaitGroup{} - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) - var err error - action := rand.Intn(2) - if action == 0 { - err = sh.startTimer() - } else { - err = sh.stopTimer() - } - require.NoError(t, err) - select { - case e := <-ch: - require.FailNow(t, "unexpected error", "error: %v", e) - case <-ctx.Done(): - default: - } - }() - } - wg.Wait() - }, - }, - { - name: "All stops", - f: func() { - ch := make(chan error) - sh := newStallHandler(dl, ch) - defer sh.stopTimer() // This should always be called in a defer from where it's created - wg := sync.WaitGroup{} - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - err := sh.stopTimer() - require.NoError(t, err) - select { - case e := <-ch: - require.FailNow(t, "unexpected error", "error: %v", e) - case <-ctx.Done(): - default: - } - }() - } - wg.Wait() - }, - }, - { - name: "All starts", - f: func() { - ch := make(chan error) - sh := newStallHandler(dl, ch) - defer sh.stopTimer() // This should always be called in a defer from where it's created - wg := sync.WaitGroup{} - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - err := sh.startTimer() - require.NoError(t, err) - select { - case e := <-ch: - require.FailNow(t, "unexpected error", "error: %v", e) - case <-ctx.Done(): - default: - } - }() - } - wg.Wait() - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.f() - }) - } -} diff --git a/tools/unit_test_runner.sh b/tools/unit_test_runner.sh index efcd03aec0c..242747fda16 100755 --- a/tools/unit_test_runner.sh +++ b/tools/unit_test_runner.sh @@ -84,7 +84,7 @@ for pkg in $flaky_tests; do max_attempts=3 attempt=1 # Set a timeout because some tests may deadlock when they flake. - until go test -timeout 5m $VT_GO_PARALLEL $pkg -v -count=1; do + until go test -timeout 3m $VT_GO_PARALLEL $pkg -v -count=1; do echo "FAILED (try $attempt/$max_attempts) in $pkg (return code $?). See above for errors." if [ $((++attempt)) -gt $max_attempts ]; then echo "ERROR: Flaky Go unit tests in package $pkg failed too often (after $max_attempts retries). Please reduce the flakiness."