diff --git a/go/test/endtoend/vreplication/fk_config_test.go b/go/test/endtoend/vreplication/fk_config_test.go index 97d868cb371..822d44105b4 100644 --- a/go/test/endtoend/vreplication/fk_config_test.go +++ b/go/test/endtoend/vreplication/fk_config_test.go @@ -67,7 +67,7 @@ insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23'); } ] }, - "t1": { + "t1": { "column_vindexes": [ { "column": "id", @@ -75,7 +75,7 @@ insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23'); } ] }, - "t2": { + "t2": { "column_vindexes": [ { "column": "t1id", diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 5d6cbb7b94c..e6133e9e0ff 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -283,10 +283,10 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result { // constraints, where the parent table is lexicographically sorted before the child table and // thus may be dropped first, can be successfully cancelled. func testFKCancel(t *testing.T, vc *VitessCluster) { - var targetKeyspace = "fktarget" - var sourceKeyspace = "fksource" - var workflowName = "wf2" - var ksWorkflow = fmt.Sprintf("%s.%s", targetKeyspace, workflowName) + targetKeyspace := "fktarget" + sourceKeyspace := "fksource" + workflowName := "wf2" + ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) mt := newMoveTables(vc, &moveTablesWorkflow{ workflowInfo: &workflowInfo{ vc: vc, diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 4e50ea12af3..c28118a97cc 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -22,6 +22,7 @@ import ( "io" "net/http" "runtime" + "strconv" "strings" "sync" "testing" @@ -1155,7 +1156,7 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) { func materializeProduct(t *testing.T, useVtctldClient bool) { t.Run("materializeProduct", func(t *testing.T) { - // materializing from "product" keyspace to "customer" keyspace + // Materializing from "product" keyspace to "customer" keyspace. workflow := "cproduct" keyspace := "customer" defaultCell := vc.Cells[vc.CellNames[0]] @@ -1169,7 +1170,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "primary") t.Run("throttle-app-product", func(t *testing.T) { - // Now, throttle the streamer on source tablets, insert some rows + // Now, throttle the source side component (vstreamer), and insert some rows. for _, tab := range productTablets { body, err := throttleApp(tab, sourceThrottlerAppName) assert.NoError(t, err) @@ -1180,19 +1181,33 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled) } insertMoreProductsForSourceThrottler(t) - // To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place, - // we expect the additional rows to **not appear** in the materialized view + // To be fair to the test, we give the target time to apply the new changes. We + // expect it to NOT get them in the first place, we expect the additional rows + // to **not appear** in the materialized view. for _, tab := range customerTablets { waitForRowCountInTablet(t, tab, keyspace, workflow, 5) + // Confirm that we updated the stats on the target tablets as expected. + jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2} + vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int() + require.Greater(t, vstreamerThrottledCount, int64(0)) + // We only need to do this stat check once. + val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) + require.NoError(t, err) + throttledCount, err := strconv.ParseInt(val, 10, 64) + require.NoError(t, err) + require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount) } }) t.Run("unthrottle-app-product", func(t *testing.T) { - // unthrottle on source tablets, and expect the rows to show up + // Unthrottle the vstreamer component, and expect the rows to show up. for _, tab := range productTablets { body, err := unthrottleApp(tab, sourceThrottlerAppName) assert.NoError(t, err) assert.Contains(t, body, sourceThrottlerAppName) - // give time for unthrottling to take effect and for target to fetch data + // Give time for unthrottling to take effect and for targets to fetch data. waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled) } for _, tab := range customerTablets { @@ -1201,8 +1216,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { }) t.Run("throttle-app-customer", func(t *testing.T) { - // Now, throttle vreplication (vcopier/vapplier) on target tablets, and - // insert some more rows. + // Now, throttle vreplication on the target side (vplayer), and insert some + // more rows. for _, tab := range customerTablets { body, err := throttleApp(tab, targetThrottlerAppName) assert.NoError(t, err) @@ -1217,6 +1232,13 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { // rows to **not appear** in the materialized view. for _, tab := range customerTablets { waitForRowCountInTablet(t, tab, keyspace, workflow, 8) + // Confirm that we updated the stats on the target tablets as expected. + jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} + vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int() + require.Greater(t, vplayerThrottledCount, int64(0)) } }) t.Run("unthrottle-app-customer", func(t *testing.T) { diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 88c4093f451..6bdc2d70d2d 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -105,6 +105,8 @@ type Stats struct { PartialQueryCount *stats.CountersWithMultiLabels PartialQueryCacheSize *stats.CountersWithMultiLabels + + ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component } // RecordHeartbeat updates the time the last heartbeat from vstreamer was seen @@ -174,6 +176,7 @@ func NewStats() *Stats { bps.TableCopyTimings = stats.NewTimings("", "", "Table") bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) + bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"}) return bps } @@ -369,13 +372,14 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error { if backoff == throttler.NotThrottled { break } + blp.blplStats.ThrottledCounts.Add([]string{"trx", "binlogplayer"}, 1) // We don't bother checking for context cancellation here because the // sleep will block only up to 1 second. (Usually, backoff is 1s / rate // e.g. a rate of 1000 TPS results into a backoff of 1 ms.) time.Sleep(backoff) } - // get the response + // Get the response. response, err := stream.Recv() // Check context before checking error, because canceled // contexts could be wrapped as regular errors. diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index 3ceee09d5f7..27be6442cfe 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -281,7 +281,10 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba } sct.topoServer.UpdateSrvVSchema(ctx, cell, srvVSchema) - current, updateChan, _ := sct.topoServer.WatchSrvVSchema(ctx, cell) + current, updateChan, err := sct.topoServer.WatchSrvVSchema(ctx, cell) + if err != nil { + panic(fmt.Sprintf("sandboxTopo WatchSrvVSchema returned an error: %v", err)) + } if !callback(current.Value, nil) { panic("sandboxTopo callback returned false") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 892247efee0..5b5b6ede24c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -59,10 +59,12 @@ type vrStats struct { mu sync.Mutex isOpen bool controllers map[int32]*controller + + ThrottledCount *stats.Counter } func (st *vrStats) register() { - + st.ThrottledCount = stats.NewCounter("", "") stats.NewGaugeFunc("VReplicationStreamCount", "Number of vreplication streams", st.numControllers) stats.NewGaugeFunc("VReplicationLagSecondsMax", "Max vreplication seconds behind primary", st.maxReplicationLagSeconds) stats.NewStringMapFuncWithMultiLabels( @@ -502,6 +504,29 @@ func (st *vrStats) register() { return result }) + stats.NewCounterFunc( + "VReplicationThrottledCountTotal", + "The total number of times that vreplication has been throttled", + func() int64 { + st.mu.Lock() + defer st.mu.Unlock() + return st.ThrottledCount.Get() + }) + stats.NewCountersFuncWithMultiLabels( + "VReplicationThrottledCounts", + "The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled", + []string{"workflow", "id", "throttler", "component"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64) + for _, ct := range st.controllers { + for key, val := range ct.blpStats.ThrottledCounts.Counts() { + result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val + } + } + return result + }) } func (st *vrStats) numControllers() int64 { diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 79149d34d6d..d94802adb7b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" @@ -132,7 +133,9 @@ func TestStatusHtml(t *testing.T) { func TestVReplicationStats(t *testing.T) { blpStats := binlogplayer.NewStats() defer blpStats.Stop() - testStats := &vrStats{} + testStats := &vrStats{ + ThrottledCount: stats.NewCounter("", ""), + } testStats.isOpen = true testStats.controllers = map[int32]*controller{ 1: { @@ -184,6 +187,14 @@ func TestVReplicationStats(t *testing.T) { require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount) require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount) + testStats.ThrottledCount.Add(99) + require.Equal(t, int64(99), testStats.ThrottledCount.Get()) + + blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10) + blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80) + require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"]) + require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"]) + var tm int64 = 1234567890 blpStats.RecordHeartbeat(tm) require.Equal(t, tm, blpStats.Heartbeat()) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 67f4815b2a1..daa642c53af 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -580,10 +580,21 @@ func (vr *vreplicator) throttlerAppName() string { return throttlerapp.Concatenate(names...) } +// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record +// with a rate limit so that it's only saved in the database at most once per +// throttleUpdatesRateLimiter.tickerTime. +// It also increments the throttled count in the stats to keep track of how many +// times a VReplication workflow, and the specific sub-component, is throttled by the +// tablet throttler over time. It also increments the global throttled count to keep +// track of how many times in total vreplication has been throttled across all workflows +// (both ones that currently exist and ones that no longer do). func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error { + appName := appThrottled.String() + vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1) + globalStats.ThrottledCount.Add(1) err := vr.throttleUpdatesRateLimiter.Do(func() error { tm := time.Now().Unix() - update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String()) + update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appName) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go index cc586756fe8..0b479bd588c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go @@ -61,7 +61,7 @@ const ( ) func getDefaultCollationID() int64 { - return 45 + return 45 // utf8mb4_general_ci } var ( diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 5a1a786513f..4d1983cd4d3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -89,6 +89,12 @@ func TestNoBlob(t *testing.T) { env = nil newEngine(t, ctx, "noblob") defer func() { + if engine != nil { + engine.Close() + } + if env != nil { + env.Close() + } engine = oldEngine env = oldEnv }() @@ -1854,6 +1860,12 @@ func TestMinimalMode(t *testing.T) { env = nil newEngine(t, ctx, "minimal") defer func() { + if engine != nil { + engine.Close() + } + if env != nil { + env.Close() + } engine = oldEngine env = oldEnv }()