diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 33f40b330b7..53f5fcead8d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -672,11 +672,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - if !vs.isRetriableError(err) { + + retry, ignoreTablet := vs.isRetriableError(err) + if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } - ignoreTablets[tablet.Alias.String()] = tablet.GetAlias() + if ignoreTablet { + ignoreTablets[tablet.Alias.String()] = tablet.GetAlias() + } + errCount++ if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) @@ -686,18 +691,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } -func (vs *vstream) isRetriableError(err error) bool { +func (vs *vstream) isRetriableError(err error) (bool, bool) { errCode := vterrors.Code(err) - if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE || errCode == vtrpcpb.Code_NOT_FOUND { - return true + if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { + return true, false } - if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { - return true + if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND { + return true, true } - return false + return false, false } // sendAll sends a group of events together while holding the lock. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 5d6b1434340..a26e9fc4db5 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -388,42 +388,48 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { func TestVStreamRetriableErrors(t *testing.T) { type testCase struct { - name string - code vtrpcpb.Code - msg string - shouldRetry bool + name string + code vtrpcpb.Code + msg string + shouldRetry bool + ignoreTablet bool } tcases := []testCase{ { - name: "failed precondition", - code: vtrpcpb.Code_FAILED_PRECONDITION, - msg: "", - shouldRetry: true, + name: "failed precondition", + code: vtrpcpb.Code_FAILED_PRECONDITION, + msg: "", + shouldRetry: true, + ignoreTablet: false, }, { - name: "gtid mismatch", - code: vtrpcpb.Code_INVALID_ARGUMENT, - msg: "GTIDSet Mismatch aa", - shouldRetry: true, + name: "gtid mismatch", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "GTIDSet Mismatch aa", + shouldRetry: true, + ignoreTablet: true, }, { - name: "not found", - code: vtrpcpb.Code_NOT_FOUND, - msg: "", - shouldRetry: true, + name: "not found", + code: vtrpcpb.Code_NOT_FOUND, + msg: "", + shouldRetry: true, + ignoreTablet: true, }, { - name: "unavailable", - code: vtrpcpb.Code_UNAVAILABLE, - msg: "", - shouldRetry: true, + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "", + shouldRetry: true, + ignoreTablet: false, }, { - name: "should not retry", - code: vtrpcpb.Code_INVALID_ARGUMENT, - msg: "final error", - shouldRetry: false, + name: "should not retry", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "final error", + shouldRetry: false, + ignoreTablet: false, }, } @@ -454,9 +460,16 @@ func TestVStreamRetriableErrors(t *testing.T) { addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc1.Tablet()) vsm := newTestVStreamManager(ctx, hc, st, cells[0]) + // always have the local cell tablet error so it's ignored on retry and we pick the other one + // if the error requires ignoring the tablet on retry sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) - sbc1.AddVStreamEvents(commit, nil) + + if tcase.ignoreTablet { + sbc1.AddVStreamEvents(commit, nil) + } else { + sbc0.AddVStreamEvents(commit, nil) + } vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{