From 5929d429813c31005f44c7083dd72ef3d7b9518a Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 5 Dec 2023 04:35:39 -0500 Subject: [PATCH] tabletserver: do not consolidate streams on primary tablet when consolidator mode is `notOnPrimary` (#14332) --- changelog/19.0/19.0.0/summary.md | 8 + go/vt/vttablet/endtoend/config_test.go | 138 ++++++++++-------- go/vt/vttablet/tabletserver/query_executor.go | 2 +- .../tabletserver/stream_consolidator.go | 6 +- .../stream_consolidator_flaky_test.go | 5 +- go/vt/vttablet/tabletserver/tabletserver.go | 1 + 6 files changed, 100 insertions(+), 60 deletions(-) diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md index af9c83399a3..2b5fdfe3d65 100644 --- a/changelog/19.0/19.0.0/summary.md +++ b/changelog/19.0/19.0.0/summary.md @@ -8,6 +8,8 @@ - [VTTablet Flags](#vttablet-flags) - **[Docker](#docker)** - [New MySQL Image](#mysql-image) + - **[New Stats](#new-stats)** + - [Stream Consolidations](#stream-consolidations) - **[VTGate](#vtgate)** - [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware) - **[Query Compatibility](#query-compatibility)** @@ -43,6 +45,12 @@ This lightweight image is a replacement of `vitess/lite` to only run `mysqld`. Several tags are available to let you choose what version of MySQL you want to use: `vitess/mysql:8.0.30`, `vitess/mysql:8.0.34`. +### new stats + +#### Stream Consolidations + +Prior to 19.0 VTTablet reported how much time non-streaming executions spend waiting for consolidations to occur. In 19.0, VTTablet reports a similar stat for streaming executions in `/debug/vars` stat `Waits.Histograms.StreamConsolidations`. + ### VTGate #### `FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 60303cf4bf5..9eef54bd0bb 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -108,64 +108,88 @@ func TestDisableConsolidator(t *testing.T) { } func TestConsolidatorReplicasOnly(t *testing.T) { - totalConsolidationsTag := "Waits/Histograms/Consolidations/Count" - initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - var wg sync.WaitGroup - wg.Add(2) - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg.Done() - }() - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg.Done() - }() - wg.Wait() - afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - assert.Equal(t, initial+1, afterOne, "expected one consolidation") - - revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary) - defer revert() - - // primary should not do query consolidation - var wg2 sync.WaitGroup - wg2.Add(2) - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg2.Done() - }() - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg2.Done() - }() - wg2.Wait() - noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations") - - // become a replica, where query consolidation should happen - client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA) - - err := client.SetServingType(topodatapb.TabletType_REPLICA) - require.NoError(t, err) - defer func() { - err = client.SetServingType(topodatapb.TabletType_PRIMARY) - require.NoError(t, err) - }() + type executeFn func( + query string, bindvars map[string]*querypb.BindVariable, + ) (*sqltypes.Result, error) + + testCases := []struct { + name string + getExecuteFn func(qc *framework.QueryClient) executeFn + totalConsolidationsTag string + }{ + { + name: "Execute", + getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.Execute }, + totalConsolidationsTag: "Waits/Histograms/Consolidations/Count", + }, + { + name: "StreamExecute", + getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.StreamExecute }, + totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count", + }, + } - initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - var wg3 sync.WaitGroup - wg3.Add(2) - go func() { - client.Execute("select sleep(0.5) from dual", nil) - wg3.Done() - }() - go func() { - client.Execute("select sleep(0.5) from dual", nil) - wg3.Done() - }() - wg3.Wait() - afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - assert.Equal(t, initial+1, afterOne, "expected another consolidation") + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + var wg sync.WaitGroup + wg.Add(2) + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg.Done() + }() + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg.Done() + }() + wg.Wait() + afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + assert.Equal(t, initial+1, afterOne, "expected one consolidation") + + revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary) + defer revert() + + // primary should not do query consolidation + var wg2 sync.WaitGroup + wg2.Add(2) + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg2.Done() + }() + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg2.Done() + }() + wg2.Wait() + noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations") + + // become a replica, where query consolidation should happen + client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA) + + err := client.SetServingType(topodatapb.TabletType_REPLICA) + require.NoError(t, err) + defer func() { + err = client.SetServingType(topodatapb.TabletType_PRIMARY) + require.NoError(t, err) + }() + + initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + var wg3 sync.WaitGroup + wg3.Add(2) + go func() { + testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil) + wg3.Done() + }() + go func() { + testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil) + wg3.Done() + }() + wg3.Wait() + afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + assert.Equal(t, initial+1, afterOne, "expected another consolidation") + }) + } } func TestQueryPlanCache(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 63dcd42d0a8..735562c536f 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -340,7 +340,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error { if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil { if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() { - return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback, + return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback, func(callback StreamCallback) error { dbConn, err := qre.getStreamConn() if err != nil { diff --git a/go/vt/vttablet/tabletserver/stream_consolidator.go b/go/vt/vttablet/tabletserver/stream_consolidator.go index 9f720059dce..cbf99eaffd4 100644 --- a/go/vt/vttablet/tabletserver/stream_consolidator.go +++ b/go/vt/vttablet/tabletserver/stream_consolidator.go @@ -19,9 +19,11 @@ package tabletserver import ( "sync" "sync/atomic" + "time" "vitess.io/vitess/go/sqltypes" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) @@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) { // `callback`. A `leaderCallback` must also be supplied: this function must perform the actual // query in the upstream MySQL server, yielding results into the modified callback that it receives // as an argument. -func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error { +func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error { var ( inflight *streamInFlight catchup []*sqltypes.Result @@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri // if we have a followChan, we're following up on a query that is already being served if followChan != nil { + startTime := time.Now() defer func() { memchange := inflight.unfollow(followChan, sc.cleanup) atomic.AddInt64(&sc.memory, memchange) + waitTimings.Record("StreamConsolidations", startTime) }() logStats.QuerySources |= tabletenv.QuerySourceConsolidator diff --git a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go index 0c903933412..caa519cc477 100644 --- a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go +++ b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/sqltypes" @@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string go func(worker int) { defer wg.Done() + exporter := servenv.NewExporter("ConsolidatorTest", "") + timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations") logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation") query, callback := generateCallback(worker) start := time.Now() - err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error { + err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error { cr := ct.results[worker] cr.items = append(cr.items, result) atomic.AddInt64(&cr.count, 1) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 3b0ac598ad0..12c6a40868f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -948,6 +948,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ ctx: ctx, logStats: logStats, tsv: tsv, + tabletType: target.GetTabletType(), setting: connSetting, } return qre.Stream(callback)