Skip to content

Commit

Permalink
Remove stallHandler related code
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed May 28, 2024
1 parent d2fea8d commit c666b14
Show file tree
Hide file tree
Showing 10 changed files with 6 additions and 289 deletions.
1 change: 0 additions & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ Flags:
--v Level log level for V logs
-v, --version print binary version
--vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging
--vplayer-progress-deadline duration At what point, without having been able to successfully replicate a pending batch of events, should we consider the vplayer stalled; producing an error and log message and restarting the player.
--vreplication-parallel-insert-workers int Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase. (default 1)
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
Expand Down
1 change: 0 additions & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ Flags:
--v Level log level for V logs
-v, --version print binary version
--vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging
--vplayer-progress-deadline duration At what point, without having been able to successfully replicate a pending batch of events, should we consider the vplayer stalled; producing an error and log message and restarting the player.
--vreplication-parallel-insert-workers int Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase. (default 1)
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ func TestMain(m *testing.M) {
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
"--vplayer-progress-deadline", "15s",
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
4 changes: 1 addition & 3 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ var (
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{
"--vplayer-progress-deadline", "15s",
}
extraVTTabletArgs = []string{}

parallelInsertWorkers = "--vreplication-parallel-insert-workers=4"

Expand Down
7 changes: 0 additions & 7 deletions go/test/utils/noleak.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ func ensureNoGoroutines() error {
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/pools/smartconnpool.(*ConnPool[...]).runWorker.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/stats.(*Rates).track"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/timer.(*Timer).run"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/srvtopo.NewResilientServer.NewSrvVSchemaWatcher.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/srvtopo.NewResilientServer.NewSrvVSchemaWatcher.func2"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/topo/memorytopo.(*Conn).Watch.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/topo.(*Server).WatchSrvVSchema.func1"),
goleak.IgnoreTopFunction("testing.tRunner.func1"),
}

Expand Down
3 changes: 0 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ func registerVReplicationFlags(fs *pflag.FlagSet) {
fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table")

fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.")

// At what point should we consider a vplayer to be stuck, produce an error, and retry?
fs.DurationVar(&vplayerProgressDeadline, "vplayer-progress-deadline", vplayerProgressDeadline, "At what point, without having been able to successfully replicate a pending batch of events, should we consider the vplayer stalled; producing an error and log message and restarting the player.")
}

func init() {
Expand Down
115 changes: 2 additions & 113 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"math"
"strconv"
"strings"
"sync/atomic"
"time"

"vitess.io/vitess/go/mysql/replication"
Expand All @@ -39,9 +38,6 @@ import (
)

var (
// At what point should we consider the vplayer to be stalled and return an error.
vplayerProgressDeadline = time.Duration(0) // Disabled by default.

// The error to return when we have detected a stall in the vplayer.
ErrVPlayerStalled = fmt.Errorf("progress stalled; vplayer was unable to replicate the transaction in a timely manner; examine the target mysqld instance health and the replicated queries' EXPLAIN output to see why queries are taking unusually long")
)
Expand Down Expand Up @@ -96,11 +92,6 @@ type vplayer struct {
// foreignKeyChecksStateInitialized is set to true once we have initialized the foreignKeyChecksEnabled.
// The initialization is done on the first row event that this vplayer sees.
foreignKeyChecksStateInitialized bool

// stallHandler is used to detect stalls when applying replicated user
// transactions and break out of the stall with a meaningful user error
// and log message, allowing for another retry attempt.
stallHandler *stallHandler
}

// NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event.
Expand Down Expand Up @@ -131,15 +122,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
}
stallHandler := newStallHandler(vplayerProgressDeadline, nil)
commitFunc := func() error {
// Explicit commits are only done when we are processing a batch of replicated
// queries and NOT for heartbeats or when simply updating the position. So we
// stop the timer here.
if err := vr.dbClient.Commit(); err != nil {
return err
}
return stallHandler.stopTimer()
return vr.dbClient.Commit()
}
batchMode := false
if vttablet.VReplicationExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
Expand Down Expand Up @@ -170,13 +154,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch
}
commitFunc = func() error {
// Explicit commits are only done when we are processing a batch of replicated
// queries and NOT for heartbeats or when simply updating the position. So we
// stop timer here.
if err := vr.dbClient.CommitTrxQueryBatch(); err != nil { // Commit the current trx batch
return err
}
return stallHandler.stopTimer()
return vr.dbClient.CommitTrxQueryBatch()
}
vr.dbClient.maxBatchSize = maxAllowedPacket
}
Expand All @@ -195,7 +173,6 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
query: queryFunc,
commit: commitFunc,
batchMode: batchMode,
stallHandler: stallHandler,
}
}

Expand Down Expand Up @@ -295,8 +272,6 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
}()

applyErr := make(chan error, 1)
vp.stallHandler.fire = applyErr
defer vp.stallHandler.stopTimer()
go func() {
applyErr <- vp.applyEvents(ctx, relay)
}()
Expand Down Expand Up @@ -631,9 +606,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
}

if !vp.vr.dbClient.InTransaction {
Expand All @@ -655,9 +627,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
tplan, err := vp.replicatorPlan.buildExecutionPlan(event.FieldEvent)
if err != nil {
return err
Expand All @@ -678,9 +647,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
if err := vp.applyStmtEvent(ctx, event); err != nil {
return err
}
Expand All @@ -691,9 +657,6 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil {
log.Infof("Error applying row event: %s", err.Error())
return err
Expand Down Expand Up @@ -838,77 +801,3 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m

return nil
}

// stallHandler is used to monitor for vplayer stalls and trigger an error
// when detected. This is used today to detect when a vplayer is not able to
// commit/complete a replicated user transaction within a configured period of
// time. It offers a lock-free implementation that is idempotent; you can call
// startTimer()/stopTimer() as many times as you like and in any order -- it
// will ensure that there's only ever 0 or 1 timers/goroutines running at a
// time. When it is already running, calls to startTimer() will only reset
// the timer's deadline.
type stallHandler struct {
timer atomic.Pointer[time.Timer]
deadline time.Duration
fire chan error
stop chan struct{}
}

// newStallHandler initializes a stall handler. You should call stopTimer()
// in a defer from the same function where you initalize a new stallHandler.
func newStallHandler(dl time.Duration, ch chan error) *stallHandler {
return &stallHandler{
deadline: dl,
fire: ch,
stop: make(chan struct{}),
}
}

// startTimer starts the timer if it's not already running and it otherwise
// resets the timer's deadline when it it is already running.
func (sh *stallHandler) startTimer() error {
if sh == nil || sh.deadline == 0 { // Stall handling is disabled
return nil
}
// If the timer has not been initialized yet, then do so.
if swapped := sh.timer.CompareAndSwap(nil, time.NewTimer(sh.deadline)); !swapped {
// Otherwise, reset the timer's deadline.
if sh.timer.Load().Reset(sh.deadline) {
// The timer was already running, so be sure the channel is drained.
select {
case <-sh.timer.Load().C:
default:
}
// The timer goroutine was already running, so now that we've reset the
// timer's deadline we're done.
return nil
}
}
go func() {
select {
case <-sh.timer.Load().C: // The timer expired
sh.fire <- vterrors.Wrapf(ErrVPlayerStalled,
"failed to commit transaction batch before the configured --vplayer-progress-deadline of %v", vplayerProgressDeadline)
case <-sh.stop: // The timer was stopped
}
}()
return nil
}

// stopTimer stops the timer if it's currently running.
func (sh *stallHandler) stopTimer() error {
if sh == nil || sh.deadline == 0 || sh.timer.Load() == nil { // Stall handling is currently disabled
return nil
}
if sh.timer.Load().Stop() {
// It was running, so signal the goroutine to stop.
sh.stop <- struct{}{}
return nil
}
// It wasn't running, so be sure that the channel is drained.
select {
case <-sh.timer.Load().C:
default:
}
return nil
}
32 changes: 2 additions & 30 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3330,19 +3330,15 @@ func TestPlayerStalls(t *testing.T) {
log.Errorf = logger.Errorf

ovmhu := vreplicationMinimumHeartbeatUpdateInterval
ogvpt := vplayerProgressDeadline
orlmi := relayLogMaxItems
ord := retryDelay
defer func() {
log.Errorf = ole
vreplicationMinimumHeartbeatUpdateInterval = ovmhu
vplayerProgressDeadline = ogvpt
relayLogMaxItems = orlmi
retryDelay = ord
}()

// Shorten the deadline for the test.
vplayerProgressDeadline = 5 * time.Second
// Shorten the time for a required heartbeat recording for the test.
vreplicationMinimumHeartbeatUpdateInterval = 5
// So each relay log batch will be a single statement transaction.
Expand All @@ -3355,7 +3351,7 @@ func TestPlayerStalls(t *testing.T) {
// A channel to communicate across goroutines.
done := make(chan struct{})

testTimeout := vplayerProgressDeadline * 10
testTimeout := time.Duration(int64(vreplicationMinimumHeartbeatUpdateInterval) * int64(10*time.Second))

execStatements(t, []string{
"create table t1(id bigint, val1 varchar(1000), primary key(id))",
Expand All @@ -3382,34 +3378,10 @@ func TestPlayerStalls(t *testing.T) {
postFunc func()
expectQueries bool
}{
{
name: "stall in vplayer with statements",
input: []string{
"set @@session.binlog_format='STATEMENT'", // As we are using the sleep function in the query to simulate a stall
"insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", // This should be the only query that gets replicated
// This will cause a stall in the vplayer.
fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressDeadline.Seconds()+5)),
},
expectQueries: true,
output: qh.Expect(
"insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')",
// This will cause a stall to be detected in the vplayer. This is
// what we want in the end, our improved error message (which also
// gets logged).
fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressDeadline.Seconds()+5)),
"/update _vt.vreplication set message=.*progress stalled.*",
),
postFunc: func() {
time.Sleep(vplayerProgressDeadline)
log.Flush()
require.Contains(t, logger.String(), "failed to commit transaction batch before the configured", "expected log message not found")
execStatements(t, []string{"set @@session.binlog_format='ROW'"})
},
},
{
name: "stall in vplayer with rows",
input: []string{
fmt.Sprintf("set @@session.innodb_lock_wait_timeout=%d", int64(vplayerProgressDeadline.Seconds()+5)),
fmt.Sprintf("set @@session.innodb_lock_wait_timeout=%d", vreplicationMinimumHeartbeatUpdateInterval+5),
"insert into t1(id, val1) values (10, 'mmm'), (11, 'nnn'), (12, 'ooo')",
"update t1 set val1 = 'yyy' where id = 10",
},
Expand Down
Loading

0 comments on commit c666b14

Please sign in to comment.