-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
VReplication: Estimate lag when workflow fully throttled #16577
Changes from all commits
e7f0c45
f1826cc
58605b3
a163ec9
5914378
9dd061a
77c1d1a
ee25994
001b4fb
9d2a09f
fb8741e
919056a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,6 @@ import ( | |
"vitess.io/vitess/go/mysql/collations" | ||
"vitess.io/vitess/go/mysql/replication" | ||
"vitess.io/vitess/go/sqltypes" | ||
"vitess.io/vitess/go/timer" | ||
"vitess.io/vitess/go/vt/binlog" | ||
"vitess.io/vitess/go/vt/dbconfigs" | ||
"vitess.io/vitess/go/vt/log" | ||
|
@@ -288,11 +287,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog | |
defer hbTimer.Stop() | ||
|
||
injectHeartbeat := func(throttled bool, throttledReason string) error { | ||
now := time.Now().UnixNano() | ||
select { | ||
case <-ctx.Done(): | ||
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") | ||
default: | ||
now := time.Now().UnixNano() | ||
err := bufferAndTransmit(&binlogdatapb.VEvent{ | ||
Type: binlogdatapb.VEventType_HEARTBEAT, | ||
Timestamp: now / 1e9, | ||
|
@@ -305,24 +304,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog | |
} | ||
|
||
logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval) | ||
wfNameLog := "" | ||
if vs.filter != nil && vs.filter.WorkflowName != "" { | ||
wfNameLog = fmt.Sprintf(" in workflow %s", vs.filter.WorkflowName) | ||
} | ||
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) { | ||
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) | ||
defer throttledHeartbeatsRateLimiter.Stop() | ||
for { | ||
// check throttler. | ||
// Check throttler. | ||
if checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp); !ok { | ||
// make sure to leave if context is cancelled | ||
// Make sure to leave if context is cancelled. | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
// do nothing special | ||
// Do nothing special. | ||
} | ||
throttledHeartbeatsRateLimiter.Do(func() error { | ||
return injectHeartbeat(true, checkResult.Summary()) | ||
}) | ||
// we won't process events, until we're no longer throttling | ||
logger.Infof("throttled.") | ||
logger.Infof("vstreamer throttled%s: %s.", wfNameLog, checkResult.Summary()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this spam the logs when throttled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a throttled logger that logs the message at most once every 5 minutes. It was added in #14936. I thought about removing these altogether as the value was not clear to me, but instead I at least made this one much more useful IMO as it would state the component and the workflow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK cool! |
||
continue | ||
} | ||
select { | ||
|
@@ -394,7 +391,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog | |
case <-ctx.Done(): | ||
return nil | ||
case <-hbTimer.C: | ||
if err := injectHeartbeat(false, ""); err != nil { | ||
checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOK(ctx, vs.throttlerApp) | ||
if err := injectHeartbeat(!ok, checkResult.Summary()); err != nil { | ||
if err == io.EOF { | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that we were getting double the heartbeats as we injected them here and via the timer. I consolidated them into the main loop with the timer. Otherwise we needed to do twice the work as the other heartbeats would get interspersed with these and those would indicate that we weren't throttled when in fact we were — so we needed to check the throttler there as well and reset the heartbeat timer in
injectHeartbeat()
to limit the extra heartbeats and even then some extras still went through as it was a race.