Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: reset timeout on store level (#55526) #57667

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions br/pkg/backup/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (r ResponseAndStore) GetStoreID() uint64 {

// timeoutRecv cancel the context if `Refresh()` is not called within the specified time `timeout`.
type timeoutRecv struct {
storeID uint64
wg sync.WaitGroup
parentCtx context.Context
cancel context.CancelCauseFunc
Expand Down Expand Up @@ -98,15 +99,17 @@ func (trecv *timeoutRecv) loop(timeout time.Duration) {
return
}
case <-ticker.C:
log.Warn("receive a backup response timeout")
log.Warn("wait backup response timeout, cancel the backup",
zap.Duration("timeout", timeout), zap.Uint64("storeID", trecv.storeID))
trecv.cancel(errors.Errorf("receive a backup response timeout"))
}
}
}

func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Context, *timeoutRecv) {
func StartTimeoutRecv(ctx context.Context, timeout time.Duration, storeID uint64) (context.Context, *timeoutRecv) {
cctx, cancel := context.WithCancelCause(ctx)
trecv := &timeoutRecv{
storeID: storeID,
parentCtx: ctx,
cancel: cancel,
refresh: make(chan struct{}),
Expand All @@ -117,15 +120,11 @@ func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Conte
}

func doSendBackup(
pctx context.Context,
ctx context.Context,
client backuppb.BackupClient,
req backuppb.BackupRequest,
respFn func(*backuppb.BackupResponse) error,
) error {
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
// terminate the backup if it does not receive any new response for a long time.
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse)
defer timerecv.Stop()
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
Expand Down Expand Up @@ -170,7 +169,6 @@ func doSendBackup(

for {
resp, err := bCli.Recv()
timerecv.Refresh()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
logutil.CL(ctx).Debug("backup streaming finish",
Expand All @@ -193,7 +191,7 @@ func doSendBackup(
}

func startBackup(
ctx context.Context,
pctx context.Context,
storeID uint64,
backupReq backuppb.BackupRequest,
backupCli backuppb.BackupClient,
Expand All @@ -202,14 +200,21 @@ func startBackup(
) error {
// this goroutine handle the response from a single store
select {
case <-ctx.Done():
return ctx.Err()
case <-pctx.Done():
return pctx.Err()
default:
logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID))
// Send backup request to the store.
// handle the backup response or internal error here.
// handle the store error(reboot or network partition) outside.
reqs := SplitBackupReqRanges(backupReq, concurrency)
logutil.CL(pctx).Info("starting backup to the corresponding store", zap.Uint64("storeID", storeID),
zap.Int("requestCount", len(reqs)), zap.Uint("concurrency", concurrency))

// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
// terminate the backup if it does not receive any new response for a long time.
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse, storeID)
defer timerecv.Stop()

pool := tidbutil.NewWorkerPool(concurrency, "store_backup")
eg, ectx := errgroup.WithContext(ctx)
for i, req := range reqs {
Expand All @@ -219,8 +224,10 @@ func startBackup(
retry := -1
return utils.WithRetry(ectx, func() error {
retry += 1
logutil.CL(ectx).Info("backup to store", zap.Uint64("storeID", storeID),
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
if retry > 1 {
logutil.CL(ectx).Info("retry backup to store", zap.Uint64("storeID", storeID),
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
}
return doSendBackup(ectx, backupCli, bkReq, func(resp *backuppb.BackupResponse) error {
// Forward all responses (including error).
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
Expand Down Expand Up @@ -263,6 +270,8 @@ func startBackup(
Resp: resp,
StoreID: storeID,
}:
// reset timeout when receive a response
timerecv.Refresh()
}
return nil
})
Expand Down Expand Up @@ -326,7 +335,7 @@ func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetr
// reset the state
sendAll = false
clear(newJoinStoresMap)
logutil.CL(ctx).Info("check store changes every tick")
logutil.CL(ctx).Info("check store changes every 30s")
err := watcher.Step(ctx)
if err != nil {
logutil.CL(ctx).Warn("failed to watch store changes, ignore it", zap.Error(err))
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/backup/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package backup

import (
"context"
"io"
"testing"
"time"

Expand Down Expand Up @@ -58,41 +57,42 @@ func TestTimeoutRecv(t *testing.T) {
TimeoutOneResponse = time.Millisecond * 800
// Just Timeout Once
{
err := doSendBackup(ctx, &MockBackupClient{
err := startBackup(ctx, 0, backuppb.BackupRequest{}, &MockBackupClient{
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
time.Sleep(time.Second)
require.Error(t, ctx.Err())
return nil, io.EOF
return nil, ctx.Err()
},
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
require.NoError(t, err)
}, 1, nil)
require.Error(t, err)
}

// Timeout Not At First
{
count := 0
err := doSendBackup(ctx, &MockBackupClient{
err := startBackup(ctx, 0, backuppb.BackupRequest{}, &MockBackupClient{
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
require.NoError(t, ctx.Err())
if count == 15 {
time.Sleep(time.Second)
require.Error(t, ctx.Err())
return nil, io.EOF
return nil, ctx.Err()
}
count += 1
time.Sleep(time.Millisecond * 80)
return &backuppb.BackupResponse{}, nil
},
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
require.NoError(t, err)
}, 1, make(chan *ResponseAndStore, 15))
require.Error(t, err)
require.Equal(t, count, 15)
}
}

func TestTimeoutRecvCancel(t *testing.T) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)

_, trecv := StartTimeoutRecv(cctx, time.Hour)
_, trecv := StartTimeoutRecv(cctx, time.Hour, 0)
cancel()
trecv.wg.Wait()
}
Expand All @@ -102,7 +102,7 @@ func TestTimeoutRecvCanceled(t *testing.T) {
cctx, cancel := context.WithCancel(ctx)
defer cancel()

tctx, trecv := StartTimeoutRecv(cctx, time.Hour)
tctx, trecv := StartTimeoutRecv(cctx, time.Hour, 0)
trecv.Stop()
require.Equal(t, "context canceled", tctx.Err().Error())
}
Loading