Skip to content

Commit

Permalink
Only use the stall detection for replicated user events
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed May 17, 2024
1 parent 99e2d86 commit d4517f2
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 71 deletions.
13 changes: 4 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newRelayLog(ctx context.Context, maxItems, maxSize int) *relayLog {
rl.canAccept.L = &rl.mu
rl.hasItems.L = &rl.mu

// Any time the context is done wake up all waiters to make them exit.
// Any time context is done, wake up all waiters to make them exit.
go func() {
<-ctx.Done()
rl.mu.Lock()
Expand All @@ -64,7 +64,7 @@ func newRelayLog(ctx context.Context, maxItems, maxSize int) *relayLog {
return rl
}

// Send writes events to the relay log.
// Send writes events to the relay log
func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
rl.mu.Lock()
defer rl.mu.Unlock()
Expand All @@ -74,19 +74,17 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
}
for rl.curSize > rl.maxSize || len(rl.items) >= rl.maxItems {
rl.canAccept.Wait()
// See if we should exit.
if err := rl.checkDone(); err != nil {
return err
}
}
rl.items = append(rl.items, events)
evsize := eventsSize(events)
rl.curSize += evsize
rl.curSize += eventsSize(events)
rl.hasItems.Broadcast()
return nil
}

// Fetch returns all existing items in the relay log, and empties the log.
// Fetch returns all existing items in the relay log, and empties the log
func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) {
rl.mu.Lock()
defer rl.mu.Unlock()
Expand All @@ -98,7 +96,6 @@ func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) {
defer cancelTimer()
for len(rl.items) == 0 && !rl.timedout {
rl.hasItems.Wait()
// See if we should exit.
if err := rl.checkDone(); err != nil {
return nil, err
}
Expand All @@ -111,8 +108,6 @@ func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) {
return items, nil
}

// checkDone checks to see if we've encounterd a fatal error and should thus end our
// work and return the error back to the vplayer.
func (rl *relayLog) checkDone() error {
select {
case <-rl.ctx.Done():
Expand Down
137 changes: 111 additions & 26 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"strconv"
"strings"
"sync/atomic"
"time"

"vitess.io/vitess/go/mysql/replication"
Expand All @@ -40,8 +41,10 @@ var (
// At what point should we consider the vplayer to be stalled and return an error.
vplayerProgressTimeout = 2 * time.Minute

// The error to return when we haven't made progress for the timeout.
// The error to return when we have detected a stall in the vplayer.
ErrVPlayerProgressTimeout = fmt.Errorf("progress stalled; vplayer was likely unable to replicate the previous log content's transaction in a timely manner; examine the target mysqld instance health and the replicated queries' EXPLAIN output to see why queries are taking unusually long")

debugMode atomic.Bool
)

// vplayer replays binlog events by pulling them from a vstreamer.
Expand Down Expand Up @@ -95,9 +98,10 @@ type vplayer struct {
// The initialization is done on the first row event that this vplayer sees.
foreignKeyChecksStateInitialized bool

// progressTimer is reset every time that we've made progress. If this hits the progressTimeout then we
// end the vplayer and return ErrVPlayerProgressTimeout.
progressTimer *time.Timer
// stallHandler is used to detect stalls when applying replicated user
// transactions and break out of the stall with a meaningful user error
// and log message, allowing for another retry attempt.
stallHandler *stallHandler
}

// NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event.
Expand Down Expand Up @@ -128,8 +132,15 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
}
stallHandler := newStallHandler(vplayerProgressTimeout, nil)
commitFunc := func() error {
return vr.dbClient.Commit()
// Explicit commits are only done when we are processing a batch of replicated
// queries and NOT for heartbeats or when simply updating the position. So we
// stop the timer here.
if err := vr.dbClient.Commit(); err != nil {
return err
}
return stallHandler.stopTimer()
}
batchMode := false
if vttablet.VReplicationExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
Expand Down Expand Up @@ -160,13 +171,17 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch
}
commitFunc = func() error {
return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch
// Explicit commits are only done when we are processing a batch of replicated
// queries and NOT for heartbeats or when simply updating the position. So we
// stop timer here.
if err := vr.dbClient.CommitTrxQueryBatch(); err != nil { // Commit the current trx batch
return err
}
return stallHandler.stopTimer()
}
vr.dbClient.maxBatchSize = maxAllowedPacket
}

progressTimer := time.NewTimer(vplayerProgressTimeout)

return &vplayer{
vr: vr,
startPos: settings.StartPos,
Expand All @@ -181,7 +196,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
query: queryFunc,
commit: commitFunc,
batchMode: batchMode,
progressTimer: progressTimer,
stallHandler: stallHandler,
}
}

Expand Down Expand Up @@ -281,21 +296,11 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
}()

applyErr := make(chan error, 1)
vp.stallHandler.fireChan = applyErr
go func() {
applyErr <- vp.applyEvents(ctx, relay)
}()

go func() {
select {
case <-ctx.Done():
case <-vp.progressTimer.C:
applyErr <- ErrVPlayerProgressTimeout
}
}()
defer func() {
vp.progressTimer.Stop()
}()

select {
case err := <-applyErr:
defer func() {
Expand Down Expand Up @@ -394,17 +399,12 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
}

// updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval.
// If it's not, then the workflow is stuck and we should generate an error in order to
// alert the operator and give us a chance to get out of the stuck state and continue on
// retry.
func (vp *vplayer) updatePos(ctx context.Context, ts int64) (posReached bool, err error) {
vp.numAccumulatedHeartbeats = 0
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), vreplicationStoreCompressedGTID)
if _, err := vp.query(ctx, update); err != nil {
return false, fmt.Errorf("error %v updating position", err)
}
log.Errorf("Position updated to %v, resetting progress timer", vp.pos)
vp.progressTimer.Reset(vplayerProgressTimeout)
vp.unsavedEvent = nil
vp.timeLastSaved = time.Now()
vp.vr.stats.SetLastPosition(vp.pos)
Expand Down Expand Up @@ -620,9 +620,15 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
// No-op: begin is called as needed.
case binlogdatapb.VEventType_COMMIT:
if mustSave {
if debugMode.Load() {
log.Errorf("Starting transaction (commit)")
}
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
}

if !vp.vr.dbClient.InTransaction {
Expand All @@ -641,9 +647,15 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return io.EOF
}
case binlogdatapb.VEventType_FIELD:
if debugMode.Load() {
log.Errorf("Starting transaction (field event)")
}
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
tplan, err := vp.replicatorPlan.buildExecutionPlan(event.FieldEvent)
if err != nil {
return err
Expand All @@ -661,24 +673,40 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
// If the event is for one of the AWS RDS "special" or pt-table-checksum tables, we skip
if !strings.Contains(sql, " mysql.rds_") && !strings.Contains(sql, " percona.checksums") {
// This is a player using statement based replication
if debugMode.Load() {
log.Errorf("starting transaction (statement)")
}
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
if debugMode.Load() {
log.Errorf("executing statement: %s", sql)
}
if err := vp.applyStmtEvent(ctx, event); err != nil {
return err
}
stats.Send(sql)
}
case binlogdatapb.VEventType_ROW:
// This player is configured for row based replication
if debugMode.Load() {
log.Errorf("starting transaction (row)")
}
if err := vp.vr.dbClient.Begin(); err != nil {
return err
}
if err := vp.stallHandler.startTimer(); err != nil {
return err
}
if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil {
log.Infof("Error applying row event: %s", err.Error())
return err
}
//Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed time for the Row event
// Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed
// time for the Row event.
stats.Send(fmt.Sprintf("%v", event.RowEvent))
case binlogdatapb.VEventType_OTHER:
if vp.vr.dbClient.InTransaction {
Expand Down Expand Up @@ -817,3 +845,60 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m

return nil
}

type stallHandler struct {
timer *time.Timer
timeout time.Duration
fireChan chan error
stopChan chan struct{}
}

func newStallHandler(to time.Duration, ch chan error) *stallHandler {
return &stallHandler{
timeout: to,
fireChan: ch,
stopChan: make(chan struct{}, 1),
}
}

func (sh *stallHandler) startTimer() error {
if sh == nil {
if debugMode.Load() {
log.Errorf("stallHandler is nil in startTimer")
}
return fmt.Errorf("stallHandler is nil")
}
if debugMode.Load() {
log.Errorf("Starting progress timer at %v", time.Now())
}
sh.timer = time.NewTimer(sh.timeout)
go func() {
select {
case <-sh.timer.C:
sh.fireChan <- ErrVPlayerProgressTimeout
case <-sh.stopChan:
}
}()
return nil
}

func (sh *stallHandler) stopTimer() error {
if sh == nil {
if debugMode.Load() {
log.Errorf("stallHandler is nil in stopTimer")
}
return fmt.Errorf("stallHandler is nil")
}
if sh.timer == nil {
if debugMode.Load() {
log.Errorf("stallHandler.timer is nil in stopTimer")
}
return nil
}
if debugMode.Load() {
log.Errorf("Stopping progress timer at %v", time.Now())
}
sh.timer.Stop()
sh.stopChan <- struct{}{}
return nil
}
Loading

0 comments on commit d4517f2

Please sign in to comment.