Skip to content

Commit

Permalink
ignore tablet on certain errors only
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Oct 13, 2023
1 parent ce8797a commit a843d9d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 33 deletions.
21 changes: 13 additions & 8 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}
if !vs.isRetriableError(err) {

retry, ignoreTablet := vs.isRetriableError(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
ignoreTablets[tablet.Alias.String()] = tablet.GetAlias()
if ignoreTablet {
ignoreTablets[tablet.Alias.String()] = tablet.GetAlias()
}

errCount++
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
Expand All @@ -686,18 +691,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

func (vs *vstream) isRetriableError(err error) bool {
func (vs *vstream) isRetriableError(err error) (bool, bool) {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE || errCode == vtrpcpb.Code_NOT_FOUND {
return true
if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true
if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND {
return true, true
}

return false
return false, false
}

// sendAll sends a group of events together while holding the lock.
Expand Down
63 changes: 38 additions & 25 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,42 +388,48 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {

func TestVStreamRetriableErrors(t *testing.T) {
type testCase struct {
name string
code vtrpcpb.Code
msg string
shouldRetry bool
name string
code vtrpcpb.Code
msg string
shouldRetry bool
ignoreTablet bool
}

tcases := []testCase{
{
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldRetry: true,
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldRetry: true,
ignoreTablet: false,
},
{
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
shouldRetry: true,
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
shouldRetry: true,
ignoreTablet: true,
},
{
name: "not found",
code: vtrpcpb.Code_NOT_FOUND,
msg: "",
shouldRetry: true,
name: "not found",
code: vtrpcpb.Code_NOT_FOUND,
msg: "",
shouldRetry: true,
ignoreTablet: true,
},
{
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
shouldRetry: true,
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
shouldRetry: true,
ignoreTablet: false,
},
{
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldRetry: false,
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldRetry: false,
ignoreTablet: false,
},
}

Expand Down Expand Up @@ -454,9 +460,16 @@ func TestVStreamRetriableErrors(t *testing.T) {
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc1.Tablet())

vsm := newTestVStreamManager(ctx, hc, st, cells[0])

// always have the local cell tablet error so it's ignored on retry and we pick the other one
// if the error requires ignoring the tablet on retry
sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg))
sbc1.AddVStreamEvents(commit, nil)

if tcase.ignoreTablet {
sbc1.AddVStreamEvents(commit, nil)
} else {
sbc0.AddVStreamEvents(commit, nil)
}

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Expand Down

0 comments on commit a843d9d

Please sign in to comment.