From d4517f227475eedfee1b146be30ef7cc1bbfffa4 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 16 May 2024 19:50:54 -0400 Subject: [PATCH] Only use the stall detection for replicated user events Signed-off-by: Matt Lord --- .../tabletmanager/vreplication/relaylog.go | 13 +- .../tabletmanager/vreplication/vplayer.go | 137 ++++++++++++++---- .../vreplication/vplayer_flaky_test.go | 100 +++++++++---- .../vttablet/tabletserver/vstreamer/engine.go | 18 +-- 4 files changed, 197 insertions(+), 71 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go index 74f88eee590..c2eb9c4af83 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go @@ -53,7 +53,7 @@ func newRelayLog(ctx context.Context, maxItems, maxSize int) *relayLog { rl.canAccept.L = &rl.mu rl.hasItems.L = &rl.mu - // Any time the context is done wake up all waiters to make them exit. + // Any time context is done, wake up all waiters to make them exit. go func() { <-ctx.Done() rl.mu.Lock() @@ -64,7 +64,7 @@ func newRelayLog(ctx context.Context, maxItems, maxSize int) *relayLog { return rl } -// Send writes events to the relay log. +// Send writes events to the relay log func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error { rl.mu.Lock() defer rl.mu.Unlock() @@ -74,19 +74,17 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error { } for rl.curSize > rl.maxSize || len(rl.items) >= rl.maxItems { rl.canAccept.Wait() - // See if we should exit. if err := rl.checkDone(); err != nil { return err } } rl.items = append(rl.items, events) - evsize := eventsSize(events) - rl.curSize += evsize + rl.curSize += eventsSize(events) rl.hasItems.Broadcast() return nil } -// Fetch returns all existing items in the relay log, and empties the log. +// Fetch returns all existing items in the relay log, and empties the log func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) { rl.mu.Lock() defer rl.mu.Unlock() @@ -98,7 +96,6 @@ func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) { defer cancelTimer() for len(rl.items) == 0 && !rl.timedout { rl.hasItems.Wait() - // See if we should exit. if err := rl.checkDone(); err != nil { return nil, err } @@ -111,8 +108,6 @@ func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) { return items, nil } -// checkDone checks to see if we've encounterd a fatal error and should thus end our -// work and return the error back to the vplayer. func (rl *relayLog) checkDone() error { select { case <-rl.ctx.Done(): diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 1874e91c5ab..4c93376deec 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -24,6 +24,7 @@ import ( "math" "strconv" "strings" + "sync/atomic" "time" "vitess.io/vitess/go/mysql/replication" @@ -40,8 +41,10 @@ var ( // At what point should we consider the vplayer to be stalled and return an error. vplayerProgressTimeout = 2 * time.Minute - // The error to return when we haven't made progress for the timeout. + // The error to return when we have detected a stall in the vplayer. ErrVPlayerProgressTimeout = fmt.Errorf("progress stalled; vplayer was likely unable to replicate the previous log content's transaction in a timely manner; examine the target mysqld instance health and the replicated queries' EXPLAIN output to see why queries are taking unusually long") + + debugMode atomic.Bool ) // vplayer replays binlog events by pulling them from a vstreamer. @@ -95,9 +98,10 @@ type vplayer struct { // The initialization is done on the first row event that this vplayer sees. foreignKeyChecksStateInitialized bool - // progressTimer is reset every time that we've made progress. If this hits the progressTimeout then we - // end the vplayer and return ErrVPlayerProgressTimeout. - progressTimer *time.Timer + // stallHandler is used to detect stalls when applying replicated user + // transactions and break out of the stall with a meaningful user error + // and log message, allowing for another retry attempt. + stallHandler *stallHandler } // NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event. @@ -128,8 +132,15 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) } + stallHandler := newStallHandler(vplayerProgressTimeout, nil) commitFunc := func() error { - return vr.dbClient.Commit() + // Explicit commits are only done when we are processing a batch of replicated + // queries and NOT for heartbeats or when simply updating the position. So we + // stop the timer here. + if err := vr.dbClient.Commit(); err != nil { + return err + } + return stallHandler.stopTimer() } batchMode := false if vttablet.VReplicationExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { @@ -160,13 +171,17 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch } commitFunc = func() error { - return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch + // Explicit commits are only done when we are processing a batch of replicated + // queries and NOT for heartbeats or when simply updating the position. So we + // stop timer here. + if err := vr.dbClient.CommitTrxQueryBatch(); err != nil { // Commit the current trx batch + return err + } + return stallHandler.stopTimer() } vr.dbClient.maxBatchSize = maxAllowedPacket } - progressTimer := time.NewTimer(vplayerProgressTimeout) - return &vplayer{ vr: vr, startPos: settings.StartPos, @@ -181,7 +196,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map query: queryFunc, commit: commitFunc, batchMode: batchMode, - progressTimer: progressTimer, + stallHandler: stallHandler, } } @@ -281,21 +296,11 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) { }() applyErr := make(chan error, 1) + vp.stallHandler.fireChan = applyErr go func() { applyErr <- vp.applyEvents(ctx, relay) }() - go func() { - select { - case <-ctx.Done(): - case <-vp.progressTimer.C: - applyErr <- ErrVPlayerProgressTimeout - } - }() - defer func() { - vp.progressTimer.Stop() - }() - select { case err := <-applyErr: defer func() { @@ -394,17 +399,12 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row } // updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval. -// If it's not, then the workflow is stuck and we should generate an error in order to -// alert the operator and give us a chance to get out of the stuck state and continue on -// retry. func (vp *vplayer) updatePos(ctx context.Context, ts int64) (posReached bool, err error) { vp.numAccumulatedHeartbeats = 0 update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), vreplicationStoreCompressedGTID) if _, err := vp.query(ctx, update); err != nil { return false, fmt.Errorf("error %v updating position", err) } - log.Errorf("Position updated to %v, resetting progress timer", vp.pos) - vp.progressTimer.Reset(vplayerProgressTimeout) vp.unsavedEvent = nil vp.timeLastSaved = time.Now() vp.vr.stats.SetLastPosition(vp.pos) @@ -620,9 +620,15 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m // No-op: begin is called as needed. case binlogdatapb.VEventType_COMMIT: if mustSave { + if debugMode.Load() { + log.Errorf("Starting transaction (commit)") + } if err := vp.vr.dbClient.Begin(); err != nil { return err } + if err := vp.stallHandler.startTimer(); err != nil { + return err + } } if !vp.vr.dbClient.InTransaction { @@ -641,9 +647,15 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF } case binlogdatapb.VEventType_FIELD: + if debugMode.Load() { + log.Errorf("Starting transaction (field event)") + } if err := vp.vr.dbClient.Begin(); err != nil { return err } + if err := vp.stallHandler.startTimer(); err != nil { + return err + } tplan, err := vp.replicatorPlan.buildExecutionPlan(event.FieldEvent) if err != nil { return err @@ -661,9 +673,18 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m // If the event is for one of the AWS RDS "special" or pt-table-checksum tables, we skip if !strings.Contains(sql, " mysql.rds_") && !strings.Contains(sql, " percona.checksums") { // This is a player using statement based replication + if debugMode.Load() { + log.Errorf("starting transaction (statement)") + } if err := vp.vr.dbClient.Begin(); err != nil { return err } + if err := vp.stallHandler.startTimer(); err != nil { + return err + } + if debugMode.Load() { + log.Errorf("executing statement: %s", sql) + } if err := vp.applyStmtEvent(ctx, event); err != nil { return err } @@ -671,14 +692,21 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } case binlogdatapb.VEventType_ROW: // This player is configured for row based replication + if debugMode.Load() { + log.Errorf("starting transaction (row)") + } if err := vp.vr.dbClient.Begin(); err != nil { return err } + if err := vp.stallHandler.startTimer(); err != nil { + return err + } if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { log.Infof("Error applying row event: %s", err.Error()) return err } - //Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed time for the Row event + // Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed + // time for the Row event. stats.Send(fmt.Sprintf("%v", event.RowEvent)) case binlogdatapb.VEventType_OTHER: if vp.vr.dbClient.InTransaction { @@ -817,3 +845,60 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return nil } + +type stallHandler struct { + timer *time.Timer + timeout time.Duration + fireChan chan error + stopChan chan struct{} +} + +func newStallHandler(to time.Duration, ch chan error) *stallHandler { + return &stallHandler{ + timeout: to, + fireChan: ch, + stopChan: make(chan struct{}, 1), + } +} + +func (sh *stallHandler) startTimer() error { + if sh == nil { + if debugMode.Load() { + log.Errorf("stallHandler is nil in startTimer") + } + return fmt.Errorf("stallHandler is nil") + } + if debugMode.Load() { + log.Errorf("Starting progress timer at %v", time.Now()) + } + sh.timer = time.NewTimer(sh.timeout) + go func() { + select { + case <-sh.timer.C: + sh.fireChan <- ErrVPlayerProgressTimeout + case <-sh.stopChan: + } + }() + return nil +} + +func (sh *stallHandler) stopTimer() error { + if sh == nil { + if debugMode.Load() { + log.Errorf("stallHandler is nil in stopTimer") + } + return fmt.Errorf("stallHandler is nil") + } + if sh.timer == nil { + if debugMode.Load() { + log.Errorf("stallHandler.timer is nil in stopTimer") + } + return nil + } + if debugMode.Load() { + log.Errorf("Stopping progress timer at %v", time.Now()) + } + sh.timer.Stop() + sh.stopChan <- struct{}{} + return nil +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index e1cb30c85e8..e41cd845d8c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -3352,12 +3352,13 @@ func TestPlayerBatchMode(t *testing.T) { } } -// TestPlayerStalls confirms that the vplayer will detect a relay log IO stall -// and generate a meaningful error -- which is stored in the vreplication record -// and the vreplication_log table, as well as being logged -- when it does. +// TestPlayerStalls confirms that the vplayer will detect a stall and generate +// a meaningful error -- which is stored in the vreplication record and the +// vreplication_log table as well as being logged -- when it does. func TestPlayerStalls(t *testing.T) { defer deleteTablet(addTablet(100)) + debugMode.Store(true) ogvpt := vplayerProgressTimeout orlmi := relayLogMaxItems ord := retryDelay @@ -3365,21 +3366,21 @@ func TestPlayerStalls(t *testing.T) { vplayerProgressTimeout = ogvpt relayLogMaxItems = orlmi retryDelay = ord + debugMode.Store(false) }() - // Shorten the timeout for the test. With the default time the test would - // take 5+ minutes to run. - vplayerProgressTimeout = 10 * time.Second + // Shorten the timeout for the test. + vplayerProgressTimeout = 5 * time.Second // So each relay log batch will be a single statement transaction. relayLogMaxItems = 1 + // Don't retry the workflow if it goes into the error state. retryDelay = 10 * time.Minute - maxTimeToRetryError = 0 + maxTimeToRetryError = 1 * time.Second - testTimeout := vplayerProgressTimeout * 3 + testTimeout := vplayerProgressTimeout * 100 execStatements(t, []string{ - "set @@global.binlog_format='STATEMENT'", // As we are using the sleep function in the query to simulate a stall "create table t1(id bigint, val1 varchar(1000), primary key(id))", fmt.Sprintf("create table %s.t1(id bigint, val1 varchar(1000), primary key(id))", vrepldb), }) @@ -3399,25 +3400,72 @@ func TestPlayerStalls(t *testing.T) { cancel, _ := startVReplication(t, bls, "") defer cancel() - //stallSimulator := fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(testTimeout.Seconds())) - stallSimulator := "update t1 set val1 = concat(sleep (5), val1)" - - input := []string{ - "set @@session.binlog_format='STATEMENT'", // As we are using the sleep function in the query to simulate a stall - "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", // This should be the only query that gets replicated - stallSimulator, // This will cause a stall in the vplayer - "insert into t1(id, val1) values (4, 'ddd'), (5, 'eee'), (6, 'fff')", - "update t1 set val1 = 'zzz' where id = 1", + testcases := []struct { + name string + input []string + output qh.ExpectationSequencer + preFunc func() // This is run in a goroutine + postFunc func() + }{ + { + name: "stall in vplayer with statements", + input: []string{ + "set @@session.binlog_format='STATEMENT'", // As we are using the sleep function in the query to simulate a stall + "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", // This should be the only query that gets replicated + // This will cause a stall in the vplayer. + fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressTimeout.Seconds()+5)), + "insert into t1(id, val1) values (4, 'ddd'), (5, 'eee'), (6, 'fff')", + "update t1 set val1 = 'zzz' where id = 1", + }, + output: qh.Expect( + "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", + // This will cause a stall to be detected in the vplayer. This is + // what we want in the end, our improved error message. This is also + // the same message that gets logged. + fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressTimeout.Seconds()+5)), + "/update _vt.vreplication set message=.*progress stalled.*", + ), + postFunc: func() { + execStatements(t, []string{"set @@session.binlog_format='ROW'"}) + }, + }, + /* TODO: get this working + { + name: "stall in vplayer with rows", + input: []string{ + "insert into t1(id, val1) values (10, 'mmm'), (11, 'nnn'), (12, 'ooo')", + "update t1 set val1 = 'yyy' where id = 10", + }, + preFunc: func() { + dbc, err := env.Mysqld.GetAllPrivsConnection(context.Background()) + require.NoError(t, err) + defer dbc.Close() + stmt := fmt.Sprintf("lock table %s.t1 read; select sleep(%d); unlock tables", + vrepldb, int64(vplayerProgressTimeout.Seconds()+5)) + _, _, err = dbc.ExecuteFetchMulti(stmt, 1, false) + require.NoError(t, err) + }, + output: qh.Expect( + // Nothing should get replicated because of the table level lock held + // in the other connection from our preFunc. + "/update _vt.vreplication set message=.*progress stalled.*", + ), + }, + */ } - output := qh.Expect( - "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", - // This is what we want in the end, our improved error message. - // This is also the same message that gets logged. - "/update _vt.vreplication set message=.*progress stalled.*", - ) - execStatements(t, input) - expectNontxQueries(t, output, testTimeout) + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + execStatements(t, tc.input) + if tc.preFunc != nil { + go tc.preFunc() + } + expectNontxQueries(t, tc.output, testTimeout) + if tc.postFunc != nil { + tc.postFunc() + } + }) + } } func expectJSON(t *testing.T, table string, values [][]string, id int, exec func(ctx context.Context, query string) (*sqltypes.Result, error)) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index d62735a6d4a..501b3708eed 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -28,17 +28,16 @@ import ( "sync/atomic" "time" - "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/acl" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -557,7 +556,7 @@ func (vse *Engine) getInnoDBTrxHistoryLen(ctx context.Context, db dbconfigs.Conn return histLen } -// getMySQLReplicationLag attempts to get the seconds_behind_master value. +// getMySQLReplicationLag attempts to get the seconds_behind_source value. // If the value cannot be determined for any reason then -1 is returned, which // means "unknown" or "irrelevant" (meaning it's not actively replicating). func (vse *Engine) getMySQLReplicationLag(ctx context.Context, db dbconfigs.Connector) int64 { @@ -568,12 +567,11 @@ func (vse *Engine) getMySQLReplicationLag(ctx context.Context, db dbconfigs.Conn } defer conn.Close() - res, err := conn.ExecuteFetch(replicaLagQuery, 1, true) - if err != nil || len(res.Rows) != 1 || res.Rows[0] == nil { + status, err := conn.ShowReplicationStatus() + if err != nil { return lagSecs } - row := res.Named().Row() - return row.AsInt64("Seconds_Behind_Master", -1) + return int64(status.ReplicationLagSeconds) } // getMySQLEndpoint returns the host:port value for the vstreamer (MySQL) instance