Skip to content

Commit

Permalink
[release-19.0] VReplication: Take replication lag into account in VSt…
Browse files Browse the repository at this point in the history
…reamManager healthcheck result processing (#15761) (#15774)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
vitess-bot[bot] and mattlord authored Apr 22, 2024
1 parent 4ec2bcf commit cc64915
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 10 deletions.
23 changes: 14 additions & 9 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -531,18 +532,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
go func() {
_ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
var err error
if ctx.Err() != nil {
switch {
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
} else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil {
err = fmt.Errorf("health check failed")
} else if vs.tabletType != shr.Target.TabletType {
err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream",
vs.tabletType, shr.Target.TabletType)
} else if shr.RealtimeStats.HealthError != "" {
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
case shr.RealtimeStats.HealthError != "":
err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream",
tablet.Alias, shr.RealtimeStats.HealthError)
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError)
case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()):
err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag())
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
errCh <- err
return err
}
Expand Down Expand Up @@ -573,7 +579,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case <-ctx.Done():
return ctx.Err()
case streamErr := <-errCh:
log.Warningf("Tablet state changed: %s, attempting to restart", streamErr)
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
case <-journalDone:
// Unreachable.
Expand Down
125 changes: 125 additions & 0 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ import (

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -1279,6 +1283,127 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
}
}

// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by
// the vstream manager to confirm that we are correctly restarting the vstream when we should.
func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Capture the vstream warning log. Otherwise we need to re-implement the vstream error
// handling in SandboxConn's implementation and then we're not actually testing the
// production code.
logger := logutil.NewMemoryLogger()
log.Warningf = logger.Warningf

cell := "aa"
ks := "TestVStream"
shard := "0"
tabletType := topodatapb.TabletType_REPLICA
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{shard})
vsm := newTestVStreamManager(ctx, hc, st, cell)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
}},
}
source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil)
tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias)
addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet())
target := &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: tabletType,
}
highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1

type testcase struct {
name string
hcRes *querypb.StreamHealthResponse
wantErr string
}
testcases := []testcase{
{
name: "all healthy", // Will hit the context timeout
},
{
name: "failure",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: nil, // This is seen as a healthcheck stream failure
},
wantErr: fmt.Sprintf("health check failed on %s", tabletAlias),
},
{
name: "tablet type changed",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
PrimaryTermStartTimestamp: time.Now().Unix(),
RealtimeStats: &querypb.RealtimeStats{},
},
wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s",
tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()),
},
{
name: "unhealthy",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
HealthError: "unhealthy",
},
},
wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias),
},
{
name: "replication lag too high",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
ReplicationLagSeconds: highLag,
},
},
wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided",
tabletAlias, highLag),
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
done := make(chan struct{})
go func() {
sctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
defer close(done)
// SandboxConn's VStream implementation always waits for the context to timeout.
err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error {
require.Fail(t, "unexpected event", "Received unexpected events: %v", events)
return nil
})
if tc.wantErr != "" { // Otherwise we simply expect the context to timeout
if !strings.Contains(logger.String(), tc.wantErr) {
require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr)
}
}
}()
if tc.wantErr != "" {
source.SetStreamHealthResponse(tc.hcRes)
}
<-done
logger.Clear()
})
}
}

func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
gw := NewTabletGateway(ctx, hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down
17 changes: 16 additions & 1 deletion go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type SandboxConn struct {
getSchemaResult []map[string]string

parser *sqlparser.Parser

streamHealthResponse *querypb.StreamHealthResponse
}

var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check
Expand Down Expand Up @@ -456,8 +458,21 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target,
// SandboxSQRowCount is the default number of fake splits returned.
var SandboxSQRowCount = int64(10)

// StreamHealth always mocks a "healthy" result.
// SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth.
func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
sbc.streamHealthResponse = res
}

// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you
// can call SetStreamHealthResponse.
func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
if sbc.streamHealthResponse != nil {
return callback(sbc.streamHealthResponse)
}
return nil
}

Expand Down

0 comments on commit cc64915

Please sign in to comment.