Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStreamer: add throttled logs when row/result/vstreamers get throttled. #14936

Merged
merged 3 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type Engine struct {
throttlerClient *throttle.Client
}

const throttledLoggerInterval = 5 * time.Minute

// NewEngine creates a new Engine.
// Initialization sequence is: NewEngine->InitDBConfig->Open.
// Open and Close can be called multiple times and are idempotent.
Expand Down Expand Up @@ -149,6 +151,10 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot
return vse
}

func (vse *Engine) GetTabletInfo() string {
return fmt.Sprintf("%s/%s/%s", vse.cell, vse.keyspace, vse.shard)
}

// InitDBConfig initializes the target parameters for the Engine.
func (vse *Engine) InitDBConfig(keyspace, shard string) {
vse.keyspace = keyspace
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// resultStreamer streams the results of the requested query
Expand Down Expand Up @@ -97,6 +99,8 @@

response := &binlogdatapb.VStreamResultsResponse{}
byteCount := 0
loggerName := fmt.Sprintf("%s (%v)", rs.vse.GetTabletInfo(), rs.tableName)
logger := logutil.NewThrottledLogger(loggerName, throttledLoggerInterval)
for {
select {
case <-rs.ctx.Done():
Expand All @@ -106,6 +110,7 @@

// check throttler.
if !rs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(rs.ctx, throttlerapp.ResultStreamerName) {
logger.Infof("throttled.")

Check warning on line 113 in go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go#L113

Added line #L113 was not covered by tests
continue
}

Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var (
Expand Down Expand Up @@ -391,6 +393,7 @@
filtered := make([]sqltypes.Value, len(rs.plan.ColExprs))
lastpk := make([]sqltypes.Value, len(rs.pkColumns))
byteCount := 0
logger := logutil.NewThrottledLogger(rs.vse.GetTabletInfo(), throttledLoggerInterval)
for {
if rs.ctx.Err() != nil {
log.Infof("Stream ended because of ctx.Done")
Expand All @@ -402,6 +405,7 @@
throttleResponseRateLimiter.Do(func() error {
return safeSend(&binlogdatapb.VStreamRowsResponse{Throttled: true})
})
logger.Infof("throttled.")

Check warning on line 408 in go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go#L408

Added line #L408 was not covered by tests
continue
}

Expand Down
14 changes: 9 additions & 5 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql"
mysqlbinlog "vitess.io/vitess/go/mysql/binlog"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
vtschema "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

mysqlbinlog "vitess.io/vitess/go/mysql/binlog"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
vtschema "vitess.io/vitess/go/vt/schema"
)

const (
Expand Down Expand Up @@ -299,6 +301,7 @@
}
}

logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval)
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime)
defer throttledHeartbeatsRateLimiter.Stop()
Expand All @@ -316,6 +319,7 @@
return injectHeartbeat(true)
})
// we won't process events, until we're no longer throttling
logger.Infof("throttled.")

Check warning on line 322 in go/vt/vttablet/tabletserver/vstreamer/vstreamer.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/vstreamer/vstreamer.go#L322

Added line #L322 was not covered by tests
continue
}
select {
Expand Down
Loading