Skip to content

Commit

Permalink
VReplication: Estimate lag when workflow fully throttled (#16577)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Aug 15, 2024
1 parent 1b7fb6f commit 95c77e9
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 47 deletions.
71 changes: 52 additions & 19 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (
targetKsOpts = make(map[string]string)
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VReplicationName
targetThrottlerAppName = throttlerapp.VPlayerName
)

const (
Expand Down Expand Up @@ -1228,18 +1228,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
require.Greater(t, vstreamerThrottledCount, int64(0))
// We only need to do this stat check once.
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
throttledCount, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName)
}
})
t.Run("unthrottle-app-product", func(t *testing.T) {
Expand Down Expand Up @@ -1274,12 +1263,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
require.Greater(t, vplayerThrottledCount, int64(0))
confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName)
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
Expand Down Expand Up @@ -1709,3 +1693,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
execQuery(t, dbConn, "rollback")
}

// confirmVReplicationThrottling confirms that the throttling related metrics reflect that
// the workflow is being throttled as expected, via the expected app name, and that this
// is impacting the lag as expected.
// The tablet passed should be a target tablet for the given workflow while the keyspace
// name provided should be the source keyspace as the target tablet stats note the stream's
// source keyspace and shard.
func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) {
const (
sleepTime = 5 * time.Second
zv = int64(0)
)
time.Sleep(sleepTime) // To be sure that we accrue some lag

jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int()
require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal)

val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
require.NotEqual(t, "", val)
throttledCountTotal, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val)

// We do not calculate replication lag for the vcopier as it's not replicating
// events.
if appname != throttlerapp.VCopierName {
jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"product.0.cproduct.4": 6}
vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int()
require.NoError(t, err)
// Take off 1 second to deal with timing issues in the test.
minLagSecs := int64(int64(sleepTime.Seconds()) - 1)
require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal)

val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"})
require.NoError(t, err)
require.NotEqual(t, "", val)
vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val)
}
}
40 changes: 25 additions & 15 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,33 +476,34 @@ func (vp *vplayer) recordHeartbeat() error {
func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
defer vp.vr.dbClient.Rollback()

estimateLag := func() {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
}

// If we're not running, set ReplicationLagSeconds to be very high.
// TODO(sougou): if we also stored the time of the last event, we
// can estimate this value more accurately.
defer vp.vr.stats.ReplicationLagSeconds.Store(math.MaxInt64)
defer vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), math.MaxInt64)
var sbm int64 = -1
var lagSecs int64
for {
if ctx.Err() != nil {
return ctx.Err()
}
// Check throttler.
if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok {
_ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary())
estimateLag()
continue
}

items, err := relay.Fetch()
if err != nil {
return err
}
// No events were received. This likely means that there's a network partition.
// So, we should assume we're falling behind.
if len(items) == 0 {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
}

// Empty transactions are saved at most once every idleTimeout.
// This covers two situations:
// 1. Fetch was idle for idleTimeout.
Expand All @@ -520,12 +521,20 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
}

lagSecs = -1
for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
sbm = event.CurrentTime/1e9 - event.Timestamp
// If the event is a heartbeat sent while throttled then do not update
// the lag based on it.
// If the batch consists only of throttled heartbeat events then we cannot
// determine the actual lag, as the vstreamer is fully throttled, and we
// will estimate it after processing the batch.
if !(event.Type == binlogdatapb.VEventType_HEARTBEAT && event.Throttled) {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
lagSecs = event.CurrentTime/1e9 - event.Timestamp
}
}
mustSave := false
switch event.Type {
Expand Down Expand Up @@ -566,11 +575,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
}

if sbm >= 0 {
vp.vr.stats.ReplicationLagSeconds.Store(sbm)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second)
if lagSecs >= 0 {
vp.vr.stats.ReplicationLagSeconds.Store(lagSecs)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(lagSecs)*time.Second)
} else { // We couldn't determine the lag, so we need to estimate it
estimateLag()
}

}
}

Expand Down
24 changes: 11 additions & 13 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -288,11 +287,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
defer hbTimer.Stop()

injectHeartbeat := func(throttled bool, throttledReason string) error {
now := time.Now().UnixNano()
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
default:
now := time.Now().UnixNano()
err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
Expand All @@ -305,24 +304,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval)
wfNameLog := ""
if vs.filter != nil && vs.filter.WorkflowName != "" {
wfNameLog = fmt.Sprintf(" in workflow %s", vs.filter.WorkflowName)
}
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime)
defer throttledHeartbeatsRateLimiter.Stop()
for {
// check throttler.
// Check throttler.
if checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp); !ok {
// make sure to leave if context is cancelled
// Make sure to leave if context is cancelled.
select {
case <-ctx.Done():
return
default:
// do nothing special
// Do nothing special.
}
throttledHeartbeatsRateLimiter.Do(func() error {
return injectHeartbeat(true, checkResult.Summary())
})
// we won't process events, until we're no longer throttling
logger.Infof("throttled.")
logger.Infof("vstreamer throttled%s: %s.", wfNameLog, checkResult.Summary())
continue
}
select {
Expand Down Expand Up @@ -394,7 +391,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case <-ctx.Done():
return nil
case <-hbTimer.C:
if err := injectHeartbeat(false, ""); err != nil {
checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOK(ctx, vs.throttlerApp)
if err := injectHeartbeat(!ok, checkResult.Summary()); err != nil {
if err == io.EOF {
return nil
}
Expand Down

0 comments on commit 95c77e9

Please sign in to comment.