diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index ddd323f7d3f..a758944d3d9 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -60,7 +60,7 @@ var ( "--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(), "--buffer_drain_concurrency", "10"} extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} - // This variable can be used within specific tests to alter vttablet behavior + // This variable can be used within specific tests to alter vttablet behavior. extraVTTabletArgs = []string{} parallelInsertWorkers = "--vreplication-parallel-insert-workers=4" diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 04c4c8f3e41..d4253de27e7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -32,8 +32,6 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/sqlparser" - _flag "vitess.io/vitess/go/internal/flag" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" @@ -46,6 +44,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sidecardb" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -74,6 +73,7 @@ var ( testForeignKeyQueries = false testSetForeignKeyQueries = false doNotLogDBQueries = false + recvTimeout = 5 * time.Second ) type LogExpectation struct { @@ -492,14 +492,14 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu return qr, err } -func (dc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { +func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { queries, err := sqlparser.NewTestParser().SplitStatementToPieces(query) if err != nil { return nil, err } results := make([]*sqltypes.Result, 0, len(queries)) for _, query := range queries { - qr, err := dc.ExecuteFetch(query, maxrows) + qr, err := dbc.ExecuteFetch(query, maxrows) if err != nil { return nil, err } @@ -518,7 +518,7 @@ func expectDeleteQueries(t *testing.T) { "/delete from _vt.vreplication", "/delete from _vt.copy_state", "/delete from _vt.post_copy_action", - )) + ), recvTimeout) } func deleteAllVReplicationStreams(t *testing.T) { @@ -635,7 +635,7 @@ func expectDBClientQueries(t *testing.T, expectations qh.ExpectationSequence, sk )) } case <-time.After(5 * time.Second): - t.Fatalf("no query received") + require.FailNow(t, "no query received") failed = true } } @@ -656,7 +656,7 @@ func expectDBClientQueries(t *testing.T, expectations qh.ExpectationSequence, sk // expectNontxQueries disregards transactional statements like begin and commit. // It also disregards updates to _vt.vreplication. -func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence) { +func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence, recvTimeout time.Duration) { t.Helper() if doNotLogDBQueries { return @@ -684,8 +684,8 @@ func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence) { "query:%q\nmessage:%s\nexpectation:%s\nmatched:%t\nerror:%v\nhistory:%s", got, result.Message, result.Expectation, result.Matched, result.Error, validator.History(), )) - case <-time.After(5 * time.Second): - t.Fatalf("no query received") + case <-time.After(recvTimeout): + require.FailNow(t, "no query received") failed = true } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go index c2eb9c4af83..058ca29ff78 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go @@ -17,15 +17,18 @@ limitations under the License. package vreplication import ( + "context" "io" "sync" "time" - "context" + "vitess.io/vitess/go/vt/vterrors" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) +const relayLogIOStalledMsg = "relay log I/O stalled" + type relayLog struct { ctx context.Context maxItems int @@ -72,12 +75,18 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error { if err := rl.checkDone(); err != nil { return err } + cancelTimer := rl.startSendTimer() + defer cancelTimer() for rl.curSize > rl.maxSize || len(rl.items) >= rl.maxItems { rl.canAccept.Wait() + if rl.timedout { + return vterrors.Wrap(errVPlayerStalled, relayLogIOStalledMsg) + } if err := rl.checkDone(); err != nil { return err } } + rl.timedout = false rl.items = append(rl.items, events) rl.curSize += eventsSize(events) rl.hasItems.Broadcast() @@ -92,7 +101,7 @@ func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) { if err := rl.checkDone(); err != nil { return nil, err } - cancelTimer := rl.startTimer() + cancelTimer := rl.startFetchTimer() defer cancelTimer() for len(rl.items) == 0 && !rl.timedout { rl.hasItems.Wait() @@ -117,7 +126,33 @@ func (rl *relayLog) checkDone() error { return nil } -func (rl *relayLog) startTimer() (cancel func()) { +// startSendTimer starts a timer that will wake up the sender if we hit +// the vplayerProgressDeadline timeout. This ensures that we don't +// block forever if the vplayer cannot process the previous relay log +// contents in a timely manner; allowing us to provide the user with a +// helpful error message. +func (rl *relayLog) startSendTimer() (cancel func()) { + timer := time.NewTimer(vplayerProgressDeadline) + timerDone := make(chan struct{}) + go func() { + select { + case <-timer.C: + rl.mu.Lock() + defer rl.mu.Unlock() + rl.timedout = true + rl.canAccept.Broadcast() + case <-timerDone: + } + }() + return func() { + timer.Stop() + close(timerDone) + } +} + +// startFetchTimer starts a timer that will wake up the fetcher after +// idleTimeout to be sure that we're regularly checking for new events. +func (rl *relayLog) startFetchTimer() (cancel func()) { timer := time.NewTimer(idleTimeout) timerDone := make(chan struct{}) go func() { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 8f23f28c87d..9cbb51e76c6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -182,7 +182,7 @@ func testPlayerCopyCharPK(t *testing.T) { `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:BINARY charset:63 flags:20611} rows:{lengths:2 values:\\"c\\\\x00\\"}'.*`, "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst", "/update _vt.vreplication set state='Running", - )) + ), recvTimeout) expectData(t, "dst", [][]string{ {"a\000", "3"}, @@ -304,7 +304,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { }).Then(qh.Immediately( "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst", "/update _vt.vreplication set state='Running'", - ))) + )), recvTimeout) expectData(t, "dst", [][]string{ {"a", "1"}, @@ -415,7 +415,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) { // Wrap-up. "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst", "/update _vt.vreplication set state='Running'", - )) + ), recvTimeout) expectData(t, "dst", [][]string{ {"1", "B", "B", "3"}, @@ -790,7 +790,7 @@ func testPlayerCopyBigTable(t *testing.T) { // Copy is done. Go into running state. // All tables copied. Final catch up followed by Running state. "/update _vt.vreplication set state='Running'", - ))) + )), recvTimeout) expectData(t, "dst", [][]string{ {"1", "aaa"}, @@ -918,7 +918,7 @@ func testPlayerCopyWildcardRule(t *testing.T) { "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*src", // Copy is done. Go into running state. "/update _vt.vreplication set state='Running'", - ))) + )), recvTimeout) expectData(t, "src", [][]string{ {"1", "aaa"}, @@ -1078,7 +1078,7 @@ func testPlayerCopyTableContinuation(t *testing.T) { )).Then(qh.Immediately( "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*not_copied", "/update _vt.vreplication set state='Running'", - ))) + )), recvTimeout) expectData(t, "dst1", [][]string{ {"1", "insert in"}, @@ -1188,7 +1188,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) { `/insert into _vt.copy_state .*`, "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst", "/update _vt.vreplication set state='Running'", - ))) + )), recvTimeout) expectData(t, "dst", [][]string{ {"2", "copied"}, @@ -1279,7 +1279,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) { `/insert into _vt.copy_state .*`, "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst", "/update _vt.vreplication set state='Running'", - )) + ), recvTimeout) expectData(t, "dst", [][]string{ {"2", "copied"}, {"3", "uncopied"}, @@ -1659,7 +1659,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) { // copy of dst2 is done: delete from copy_state. "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst2", "/update _vt.vreplication set state", - )) + ), recvTimeout) expectData(t, "dst1", [][]string{ {"1", "aaa", "1aaa", "aaa1", "10"}, @@ -1826,7 +1826,7 @@ func testCopyInvisibleColumns(t *testing.T) { // copy of dst1 is done: delete from copy_state. "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst1", "/update _vt.vreplication set state='Running'", - )) + ), recvTimeout) expectData(t, "dst1", [][]string{ {"1", "10"}, diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index d7b60a104c4..c2eba565524 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -30,12 +30,25 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) +const failedToRecordHeartbeatMsg = "failed to record heartbeat" + +var ( + // At what point should we consider the vplayer to be stalled and return an error. + // 5 minutes is well beyond a reasonable amount of time for a transaction to be + // replicated. + vplayerProgressDeadline = time.Duration(5 * time.Minute) + + // The error to return when we have detected a stall in the vplayer. + errVPlayerStalled = errors.New("progress stalled; vplayer was unable to replicate the transaction in a timely manner; examine the target mysqld instance health and the replicated queries' EXPLAIN output to see why queries are taking unusually long") +) + // vplayer replays binlog events by pulling them from a vstreamer. type vplayer struct { vr *vreplicator @@ -367,12 +380,13 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row return nil } +// updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval. func (vp *vplayer) updatePos(ctx context.Context, ts int64) (posReached bool, err error) { - vp.numAccumulatedHeartbeats = 0 update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), vreplicationStoreCompressedGTID) if _, err := vp.query(ctx, update); err != nil { return false, fmt.Errorf("error %v updating position", err) } + vp.numAccumulatedHeartbeats = 0 vp.unsavedEvent = nil vp.timeLastSaved = time.Now() vp.vr.stats.SetLastPosition(vp.pos) @@ -399,8 +413,16 @@ func (vp *vplayer) recordHeartbeat() error { if !vp.mustUpdateHeartbeat() { return nil } + if err := vp.vr.updateHeartbeatTime(tm); err != nil { + return vterrors.Wrapf(errVPlayerStalled, fmt.Sprintf("%s: %v", failedToRecordHeartbeatMsg, err)) + } + // Only reset the pending heartbeat count if the update was successful. + // Otherwise the vplayer may not actually be making progress and nobody + // is aware of it -- resulting in the com_binlog_dump connection on the + // source that is managed by the binlog_player getting closed by mysqld + // when the source_net_timeout is hit. vp.numAccumulatedHeartbeats = 0 - return vp.vr.updateHeartbeatTime(tm) + return nil } // applyEvents is the main thread that applies the events. It has the following use @@ -438,7 +460,7 @@ func (vp *vplayer) recordHeartbeat() error { // current position to be saved. // // In order to handle the above use cases, we use an implicit transaction scheme: -// A BEGIN does not really start a transaction. Ony a ROW event does. With this +// A BEGIN does not really start a transaction. Only a ROW event does. With this // approach, no transaction gets started if an empty one arrives. If a we receive // a commit, and a we are not in a transaction, we infer that the transaction was // empty, and remember it as an unsaved event. The next GTID event will reset the @@ -497,6 +519,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { return nil } } + for i, events := range items { for j, event := range events { if event.Timestamp != 0 { @@ -526,7 +549,17 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if err := vp.applyEvent(ctx, event, mustSave); err != nil { if err != io.EOF { vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1) - log.Errorf("Error applying event: %s", err.Error()) + var table, tableLogMsg string + switch { + case event.GetFieldEvent() != nil: + table = event.GetFieldEvent().TableName + case event.GetRowEvent() != nil: + table = event.GetRowEvent().TableName + } + if table != "" { + tableLogMsg = fmt.Sprintf(" for table %s", table) + } + log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error()) } return err } @@ -635,7 +668,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m log.Infof("Error applying row event: %s", err.Error()) return err } - //Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed time for the Row event + // Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed + // time for the Row event. stats.Send(fmt.Sprintf("%v", event.RowEvent)) case binlogdatapb.VEventType_OTHER: if vp.vr.dbClient.InTransaction { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index f79f7a42744..b1925c3c64f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -28,17 +28,16 @@ import ( "testing" "time" - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" - - "vitess.io/vitess/go/vt/vttablet" - "github.com/nsf/jsondiff" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/vttablet" + "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" qh "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication/queryhistory" @@ -119,7 +118,7 @@ func TestPlayerGeneratedInvisiblePrimaryKey(t *testing.T) { for _, tcases := range testcases { execStatements(t, []string{tcases.input}) output := qh.Expect(tcases.output) - expectNontxQueries(t, output) + expectNontxQueries(t, output, recvTimeout) if tcases.table != "" { expectData(t, tcases.table, tcases.data) } @@ -182,7 +181,7 @@ func TestPlayerInvisibleColumns(t *testing.T) { for _, tcases := range testcases { execStatements(t, []string{tcases.input}) output := qh.Expect(tcases.output) - expectNontxQueries(t, output) + expectNontxQueries(t, output, recvTimeout) time.Sleep(1 * time.Second) if tcases.table != "" { expectData(t, tcases.table, tcases.data) @@ -272,7 +271,7 @@ func TestVReplicationTimeUpdated(t *testing.T) { require.NoError(t, err) return timeUpdated, transactionTimestamp, timeHeartbeat } - expectNontxQueries(t, qh.Expect("insert into t1(id,val) values (1,'aaa')")) + expectNontxQueries(t, qh.Expect("insert into t1(id,val) values (1,'aaa')"), recvTimeout) time.Sleep(1 * time.Second) timeUpdated1, transactionTimestamp1, timeHeartbeat1 := getTimestamps() time.Sleep(2 * time.Second) @@ -2846,7 +2845,7 @@ func TestGeneratedColumns(t *testing.T) { for _, tcases := range testcases { execStatements(t, []string{tcases.input}) output := qh.Expect(tcases.output) - expectNontxQueries(t, output) + expectNontxQueries(t, output, recvTimeout) if tcases.table != "" { expectData(t, tcases.table, tcases.data) } @@ -2916,7 +2915,7 @@ func TestPlayerInvalidDates(t *testing.T) { for _, tcases := range testcases { execStatements(t, []string{tcases.input}) output := qh.Expect(tcases.output) - expectNontxQueries(t, output) + expectNontxQueries(t, output, recvTimeout) if tcases.table != "" { expectData(t, tcases.table, tcases.data) @@ -3054,7 +3053,7 @@ func TestPlayerNoBlob(t *testing.T) { for _, tcases := range testcases { execStatements(t, []string{tcases.input}) output := qh.Expect(tcases.output) - expectNontxQueries(t, output) + expectNontxQueries(t, output, recvTimeout) time.Sleep(1 * time.Second) if tcases.table != "" { expectData(t, tcases.table, tcases.data) @@ -3291,7 +3290,7 @@ func TestPlayerBatchMode(t *testing.T) { for _, stmt := range tcase.output { require.LessOrEqual(t, len(stmt), maxBatchSize, "expected output statement is longer than the max batch size (%d): %s", maxBatchSize, stmt) } - expectNontxQueries(t, output) + expectNontxQueries(t, output, recvTimeout) time.Sleep(1 * time.Second) if tcase.table != "" { expectData(t, tcase.table, tcase.data) @@ -3317,6 +3316,154 @@ func TestPlayerBatchMode(t *testing.T) { } } +// TestPlayerStalls confirms that the vplayer will detect a stall and generate +// a meaningful error -- which is stored in the vreplication record and the +// vreplication_log table as well as being logged -- when it does. +func TestPlayerStalls(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer deleteTablet(addTablet(100)) + + // We want to check for the expected log messages. + ole := log.Errorf + logger := logutil.NewMemoryLogger() + log.Errorf = logger.Errorf + + ovmhu := vreplicationMinimumHeartbeatUpdateInterval + ogvpt := vplayerProgressDeadline + orlmi := relayLogMaxItems + ord := retryDelay + defer func() { + log.Errorf = ole + vreplicationMinimumHeartbeatUpdateInterval = ovmhu + vplayerProgressDeadline = ogvpt + relayLogMaxItems = orlmi + retryDelay = ord + }() + + // Shorten the deadline for the test. + vplayerProgressDeadline = 5 * time.Second + // Shorten the time for a required heartbeat recording for the test. + vreplicationMinimumHeartbeatUpdateInterval = 5 + // So each relay log batch will be a single statement transaction. + relayLogMaxItems = 1 + + // Don't retry the workflow if it goes into the error state. + retryDelay = 10 * time.Minute + maxTimeToRetryError = 1 * time.Second + + // A channel to communicate across goroutines. + done := make(chan struct{}) + + testTimeout := vplayerProgressDeadline * 10 + + execStatements(t, []string{ + "create table t1(id bigint, val1 varchar(1000), primary key(id))", + fmt.Sprintf("create table %s.t1(id bigint, val1 varchar(1000), primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t1", + fmt.Sprintf("drop table %s.t1", vrepldb), + }) + + filter := &binlogdatapb.Filter{} + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + } + + testcases := []struct { + name string + input []string + output qh.ExpectationSequencer + preFunc func() + postFunc func() + expectQueries bool + }{ + { + name: "stall in relay log IO", + input: []string{ + "set @@session.binlog_format='STATEMENT'", // As we are using the sleep function in the query to simulate a stall + "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", // This should be the only query that gets replicated + // This will cause a stall in the vplayer. + fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressDeadline.Seconds()+5)), + }, + expectQueries: true, + output: qh.Expect( + "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc')", + // This will cause a stall to be detected in the vplayer. This is + // what we want in the end, our improved error message (which also + // gets logged). + fmt.Sprintf("update t1 set val1 = concat(sleep (%d), val1)", int64(vplayerProgressDeadline.Seconds()+5)), + "/update _vt.vreplication set message=.*progress stalled.*", + ), + postFunc: func() { + time.Sleep(vplayerProgressDeadline) + log.Flush() + require.Contains(t, logger.String(), relayLogIOStalledMsg, "expected log message not found") + execStatements(t, []string{"set @@session.binlog_format='ROW'"}) + }, + }, + { + name: "stall in heartbeat recording", + input: []string{ + fmt.Sprintf("set @@session.innodb_lock_wait_timeout=%d", int64(vplayerProgressDeadline.Seconds()+5)), + "insert into t1(id, val1) values (10, 'mmm'), (11, 'nnn'), (12, 'ooo')", + "update t1 set val1 = 'yyy' where id = 10", + }, + preFunc: func() { + dbc, err := env.Mysqld.GetAllPrivsConnection(context.Background()) + require.NoError(t, err) + _, err = dbc.ExecuteFetch("begin", 1, false) + require.NoError(t, err) + // The row locks held for this will block us from recording the heartbeats. + _, err = dbc.ExecuteFetch("select * from _vt.vreplication for update", 1000, false) + require.NoError(t, err) + go func() { + defer func() { + dbc.Close() + }() + select { + case <-done: + case <-ctx.Done(): + } + }() + }, + postFunc: func() { + // Sleep long enough that we fail to record the heartbeat. + to := time.Duration(int64(vreplicationMinimumHeartbeatUpdateInterval*2) * int64(time.Second)) + time.Sleep(to) + // Signal the preFunc goroutine to close the connection holding the row locks. + done <- struct{}{} + log.Flush() + require.Contains(t, logger.String(), failedToRecordHeartbeatMsg, "expected log message not found") + }, + // Nothing should get replicated because of the exclusing row locks + // held in the other connection from our preFunc. + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + vrcancel, _ := startVReplication(t, bls, "") + defer vrcancel() + execStatements(t, tc.input) + if tc.preFunc != nil { + tc.preFunc() + } + if tc.expectQueries { + expectNontxQueries(t, tc.output, testTimeout) + } + if tc.postFunc != nil { + tc.postFunc() + } + logger.Clear() + }) + } +} + func expectJSON(t *testing.T, table string, values [][]string, id int, exec func(ctx context.Context, query string) (*sqltypes.Result, error)) { t.Helper() @@ -3327,26 +3474,16 @@ func expectJSON(t *testing.T, table string, values [][]string, id int, exec func query = fmt.Sprintf("select * from %s where id=%d", table, id) } qr, err := exec(context.Background(), query) - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) if len(values) != len(qr.Rows) { t.Fatalf("row counts don't match: %d, want %d", len(qr.Rows), len(values)) } for i, row := range values { - if len(row) != len(qr.Rows[i]) { - t.Fatalf("Too few columns, \nrow: %d, \nresult: %d:%v, \nwant: %d:%v", i, len(qr.Rows[i]), qr.Rows[i], len(row), row) - } - if qr.Rows[i][0].ToString() != row[0] { - t.Fatalf("Id mismatch: want %s, got %s", qr.Rows[i][0].ToString(), row[0]) - } - + require.Len(t, row, len(qr.Rows[i]), "Too few columns, \nrow: %d, \nresult: %d:%v, \nwant: %d:%v", i, len(qr.Rows[i]), qr.Rows[i], len(row), row) + require.Equal(t, qr.Rows[i][0].ToString(), row[0], "Id mismatch: want %s, got %s", qr.Rows[i][0].ToString(), row[0]) opts := jsondiff.DefaultConsoleOptions() compare, s := jsondiff.Compare(qr.Rows[i][1].Raw(), []byte(row[1]), &opts) - if compare != jsondiff.FullMatch { - t.Errorf("Diff:\n%s\n", s) - } + require.Equal(t, compare, jsondiff.FullMatch, "Diff:\n%s\n", s) } } @@ -3359,9 +3496,7 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string) // fake workflow type as MoveTables so that we can test with "noblob" binlog row image query := binlogplayer.CreateVReplication("test", bls, pos, 9223372036854775807, 9223372036854775807, 0, vrepldb, binlogdatapb.VReplicationWorkflowType_MoveTables, 0, false) qr, err := playerEngine.Exec(query) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", @@ -3372,9 +3507,8 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string) t.Helper() once.Do(func() { query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) - if _, err := playerEngine.Exec(query); err != nil { - t.Fatal(err) - } + _, err := playerEngine.Exec(query) + require.NoError(t, err) expectDeleteQueries(t) }) }, int(qr.InsertID) diff --git a/tools/unit_test_runner.sh b/tools/unit_test_runner.sh index d48f7162a4b..efcd03aec0c 100755 --- a/tools/unit_test_runner.sh +++ b/tools/unit_test_runner.sh @@ -84,7 +84,7 @@ for pkg in $flaky_tests; do max_attempts=3 attempt=1 # Set a timeout because some tests may deadlock when they flake. - until go test -timeout 2m $VT_GO_PARALLEL $pkg -v -count=1; do + until go test -timeout 5m $VT_GO_PARALLEL $pkg -v -count=1; do echo "FAILED (try $attempt/$max_attempts) in $pkg (return code $?). See above for errors." if [ $((++attempt)) -gt $max_attempts ]; then echo "ERROR: Flaky Go unit tests in package $pkg failed too often (after $max_attempts retries). Please reduce the flakiness."