Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve handling of vplayer stalls #15797

Merged
merged 40 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
bee30d4
Improve handling of vplayer stalls
mattlord Apr 25, 2024
6e929fe
Add progress timer and test
mattlord May 14, 2024
5f05388
Fix table name logging
mattlord May 16, 2024
e9e5fc5
Make error handling concurrency safe
mattlord May 16, 2024
b72bfbe
More tweaks from self review
mattlord May 16, 2024
ae275a5
Lower progress timeout as the overall stall time is a multiple
mattlord May 16, 2024
99e2d86
Move progress tracking to vplayer
mattlord May 16, 2024
d4517f2
Only use the stall detection for replicated user events
mattlord May 16, 2024
c54c507
Merge remote-tracking branch 'origin/main' into vplayer_batch_trx_tim…
mattlord May 17, 2024
fe85d9f
Tweak test a bit
mattlord May 17, 2024
61c2b73
Try to deflake CI runs
mattlord May 17, 2024
d00ff6c
WiP for row based unit test case
mattlord May 17, 2024
b45ac01
Derace work
mattlord May 17, 2024
000e5bc
Add a flag for this new behavior, which will later be disabled by def…
mattlord May 18, 2024
196bab8
Improve heartbeat management and errors
mattlord May 18, 2024
356e2c0
Update help text
mattlord May 18, 2024
e60b61b
Add ROW based test case and correct vtcombo help output
mattlord May 18, 2024
24af4aa
Test and error message corrections/improvements
mattlord May 19, 2024
f496bdf
Use better flag name
mattlord May 19, 2024
eeb2ee9
Reset min heartbeat interval
mattlord May 20, 2024
fb25daa
Changes after self review
mattlord May 20, 2024
729cf34
Minor changes from second self review
mattlord May 20, 2024
341e232
Merge remote-tracking branch 'origin/main' into vplayer_batch_trx_tim…
mattlord May 20, 2024
1749d3d
Remove explicit schema reload
mattlord May 20, 2024
5075125
Fix goroutine leak
mattlord May 24, 2024
7b5c4c5
Move progress-deadline from tablet flag to workflow option
mattlord May 24, 2024
fb0f29e
WiP
mattlord May 24, 2024
b060e09
Revert move to workflow option work
mattlord May 25, 2024
bfd67e5
Reapply vplayer improvement reverted from flag work
mattlord May 25, 2024
eeba51b
Reapply one other comment improvement
mattlord May 25, 2024
3839e90
Add leak checking unit test
mattlord May 25, 2024
d2fea8d
Add a few more expected TestMain goroutines to ignore list
mattlord May 26, 2024
c666b14
Remove stallHandler related code
mattlord May 28, 2024
0e88d6c
Re-add vplayer-progress-deadline and use it in relayLog
mattlord May 30, 2024
94877d2
Correct flag help output
mattlord May 31, 2024
a79516f
Minor changes after self review
mattlord May 31, 2024
8ea60a1
Merge remote-tracking branch 'origin/main' into vplayer_batch_trx_tim…
mattlord Jun 4, 2024
c192cc4
Kick DCO
mattlord Jun 11, 2024
5d4219c
Remove the flag since you shouldn't ever need to chane it
mattlord Jun 14, 2024
a81887a
Merge remote-tracking branch 'origin/main' into vplayer_batch_trx_tim…
mattlord Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
// This variable can be used within specific tests to alter vttablet behavior.
extraVTTabletArgs = []string{}

parallelInsertWorkers = "--vreplication-parallel-insert-workers=4"
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/sqlparser"

_flag "vitess.io/vitess/go/internal/flag"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
Expand All @@ -46,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/queryservice"
Expand Down Expand Up @@ -74,6 +73,7 @@ var (
testForeignKeyQueries = false
testSetForeignKeyQueries = false
doNotLogDBQueries = false
recvTimeout = 5 * time.Second
)

type LogExpectation struct {
Expand Down Expand Up @@ -492,14 +492,14 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu
return qr, err
}

func (dc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
queries, err := sqlparser.NewTestParser().SplitStatementToPieces(query)
if err != nil {
return nil, err
}
results := make([]*sqltypes.Result, 0, len(queries))
for _, query := range queries {
qr, err := dc.ExecuteFetch(query, maxrows)
qr, err := dbc.ExecuteFetch(query, maxrows)
if err != nil {
return nil, err
}
Expand All @@ -518,7 +518,7 @@ func expectDeleteQueries(t *testing.T) {
"/delete from _vt.vreplication",
"/delete from _vt.copy_state",
"/delete from _vt.post_copy_action",
))
), recvTimeout)
}

func deleteAllVReplicationStreams(t *testing.T) {
Expand Down Expand Up @@ -635,7 +635,7 @@ func expectDBClientQueries(t *testing.T, expectations qh.ExpectationSequence, sk
))
}
case <-time.After(5 * time.Second):
t.Fatalf("no query received")
require.FailNow(t, "no query received")
failed = true
}
}
Expand All @@ -656,7 +656,7 @@ func expectDBClientQueries(t *testing.T, expectations qh.ExpectationSequence, sk

// expectNontxQueries disregards transactional statements like begin and commit.
// It also disregards updates to _vt.vreplication.
func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence) {
func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence, recvTimeout time.Duration) {
t.Helper()
if doNotLogDBQueries {
return
Expand Down Expand Up @@ -684,8 +684,8 @@ func expectNontxQueries(t *testing.T, expectations qh.ExpectationSequence) {
"query:%q\nmessage:%s\nexpectation:%s\nmatched:%t\nerror:%v\nhistory:%s",
got, result.Message, result.Expectation, result.Matched, result.Error, validator.History(),
))
case <-time.After(5 * time.Second):
t.Fatalf("no query received")
case <-time.After(recvTimeout):
require.FailNow(t, "no query received")
failed = true
}
}
Expand Down
41 changes: 38 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ limitations under the License.
package vreplication

import (
"context"
"io"
"sync"
"time"

"context"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const relayLogIOStalledMsg = "relay log I/O stalled"

type relayLog struct {
ctx context.Context
maxItems int
Expand Down Expand Up @@ -72,12 +75,18 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
if err := rl.checkDone(); err != nil {
return err
}
cancelTimer := rl.startSendTimer()
defer cancelTimer()
for rl.curSize > rl.maxSize || len(rl.items) >= rl.maxItems {
rl.canAccept.Wait()
if rl.timedout {
return vterrors.Wrap(errVPlayerStalled, relayLogIOStalledMsg)
}
if err := rl.checkDone(); err != nil {
return err
}
}
rl.timedout = false
rl.items = append(rl.items, events)
rl.curSize += eventsSize(events)
rl.hasItems.Broadcast()
Expand All @@ -92,7 +101,7 @@ func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) {
if err := rl.checkDone(); err != nil {
return nil, err
}
cancelTimer := rl.startTimer()
cancelTimer := rl.startFetchTimer()
defer cancelTimer()
for len(rl.items) == 0 && !rl.timedout {
rl.hasItems.Wait()
Expand All @@ -117,7 +126,33 @@ func (rl *relayLog) checkDone() error {
return nil
}

func (rl *relayLog) startTimer() (cancel func()) {
// startSendTimer starts a timer that will wake up the sender if we hit
// the vplayerProgressDeadline timeout. This ensures that we don't
// block forever if the vplayer cannot process the previous relay log
// contents in a timely manner; allowing us to provide the user with a
// helpful error message.
func (rl *relayLog) startSendTimer() (cancel func()) {
timer := time.NewTimer(vplayerProgressDeadline)
timerDone := make(chan struct{})
go func() {
select {
case <-timer.C:
rl.mu.Lock()
defer rl.mu.Unlock()
rl.timedout = true
rl.canAccept.Broadcast()
case <-timerDone:
}
}()
return func() {
timer.Stop()
close(timerDone)
}
}

// startFetchTimer starts a timer that will wake up the fetcher after
// idleTimeout to be sure that we're regularly checking for new events.
func (rl *relayLog) startFetchTimer() (cancel func()) {
timer := time.NewTimer(idleTimeout)
timerDone := make(chan struct{})
go func() {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func testPlayerCopyCharPK(t *testing.T) {
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:BINARY charset:63 flags:20611} rows:{lengths:2 values:\\"c\\\\x00\\"}'.*`,
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running",
))
), recvTimeout)

expectData(t, "dst", [][]string{
{"a\000", "3"},
Expand Down Expand Up @@ -304,7 +304,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
}).Then(qh.Immediately(
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst", [][]string{
{"a", "1"},
Expand Down Expand Up @@ -415,7 +415,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
// Wrap-up.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
))
), recvTimeout)

expectData(t, "dst", [][]string{
{"1", "B", "B", "3"},
Expand Down Expand Up @@ -790,7 +790,7 @@ func testPlayerCopyBigTable(t *testing.T) {
// Copy is done. Go into running state.
// All tables copied. Final catch up followed by Running state.
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst", [][]string{
{"1", "aaa"},
Expand Down Expand Up @@ -918,7 +918,7 @@ func testPlayerCopyWildcardRule(t *testing.T) {
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*src",
// Copy is done. Go into running state.
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "src", [][]string{
{"1", "aaa"},
Expand Down Expand Up @@ -1078,7 +1078,7 @@ func testPlayerCopyTableContinuation(t *testing.T) {
)).Then(qh.Immediately(
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*not_copied",
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst1", [][]string{
{"1", "insert in"},
Expand Down Expand Up @@ -1188,7 +1188,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
`/insert into _vt.copy_state .*`,
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
)))
)), recvTimeout)

expectData(t, "dst", [][]string{
{"2", "copied"},
Expand Down Expand Up @@ -1279,7 +1279,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
`/insert into _vt.copy_state .*`,
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst",
"/update _vt.vreplication set state='Running'",
))
), recvTimeout)
expectData(t, "dst", [][]string{
{"2", "copied"},
{"3", "uncopied"},
Expand Down Expand Up @@ -1659,7 +1659,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
// copy of dst2 is done: delete from copy_state.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst2",
"/update _vt.vreplication set state",
))
), recvTimeout)

expectData(t, "dst1", [][]string{
{"1", "aaa", "1aaa", "aaa1", "10"},
Expand Down Expand Up @@ -1826,7 +1826,7 @@ func testCopyInvisibleColumns(t *testing.T) {
// copy of dst1 is done: delete from copy_state.
"/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst1",
"/update _vt.vreplication set state='Running'",
))
), recvTimeout)

expectData(t, "dst1", [][]string{
{"1", "10"},
Expand Down
44 changes: 39 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,25 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const failedToRecordHeartbeatMsg = "failed to record heartbeat"

var (
// At what point should we consider the vplayer to be stalled and return an error.
// 5 minutes is well beyond a reasonable amount of time for a transaction to be
// replicated.
vplayerProgressDeadline = time.Duration(5 * time.Minute)

// The error to return when we have detected a stall in the vplayer.
errVPlayerStalled = errors.New("progress stalled; vplayer was unable to replicate the transaction in a timely manner; examine the target mysqld instance health and the replicated queries' EXPLAIN output to see why queries are taking unusually long")
)

// vplayer replays binlog events by pulling them from a vstreamer.
type vplayer struct {
vr *vreplicator
Expand Down Expand Up @@ -367,12 +380,13 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
return nil
}

// updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval.
func (vp *vplayer) updatePos(ctx context.Context, ts int64) (posReached bool, err error) {
vp.numAccumulatedHeartbeats = 0
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), vreplicationStoreCompressedGTID)
if _, err := vp.query(ctx, update); err != nil {
return false, fmt.Errorf("error %v updating position", err)
}
vp.numAccumulatedHeartbeats = 0
vp.unsavedEvent = nil
vp.timeLastSaved = time.Now()
vp.vr.stats.SetLastPosition(vp.pos)
Expand All @@ -399,8 +413,16 @@ func (vp *vplayer) recordHeartbeat() error {
if !vp.mustUpdateHeartbeat() {
return nil
}
if err := vp.vr.updateHeartbeatTime(tm); err != nil {
return vterrors.Wrapf(errVPlayerStalled, fmt.Sprintf("%s: %v", failedToRecordHeartbeatMsg, err))
}
// Only reset the pending heartbeat count if the update was successful.
// Otherwise the vplayer may not actually be making progress and nobody
// is aware of it -- resulting in the com_binlog_dump connection on the
// source that is managed by the binlog_player getting closed by mysqld
// when the source_net_timeout is hit.
vp.numAccumulatedHeartbeats = 0
return vp.vr.updateHeartbeatTime(tm)
return nil
}

// applyEvents is the main thread that applies the events. It has the following use
Expand Down Expand Up @@ -438,7 +460,7 @@ func (vp *vplayer) recordHeartbeat() error {
// current position to be saved.
//
// In order to handle the above use cases, we use an implicit transaction scheme:
// A BEGIN does not really start a transaction. Ony a ROW event does. With this
// A BEGIN does not really start a transaction. Only a ROW event does. With this
// approach, no transaction gets started if an empty one arrives. If a we receive
// a commit, and a we are not in a transaction, we infer that the transaction was
// empty, and remember it as an unsaved event. The next GTID event will reset the
Expand Down Expand Up @@ -497,6 +519,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
return nil
}
}

for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
Expand Down Expand Up @@ -526,7 +549,17 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
if err != io.EOF {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
log.Errorf("Error applying event: %s", err.Error())
var table, tableLogMsg string
switch {
case event.GetFieldEvent() != nil:
table = event.GetFieldEvent().TableName
case event.GetRowEvent() != nil:
table = event.GetRowEvent().TableName
}
if table != "" {
tableLogMsg = fmt.Sprintf(" for table %s", table)
}
log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error())
Comment on lines +552 to +562
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated logging improvement request made directly via Slack.

}
return err
}
Expand Down Expand Up @@ -635,7 +668,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
log.Infof("Error applying row event: %s", err.Error())
return err
}
//Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed time for the Row event
// Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed
// time for the Row event.
stats.Send(fmt.Sprintf("%v", event.RowEvent))
case binlogdatapb.VEventType_OTHER:
if vp.vr.dbClient.InTransaction {
Expand Down
Loading
Loading