Skip to content

Commit

Permalink
tabletserver: do not consolidate streams on primary tablet when conso…
Browse files Browse the repository at this point in the history
…lidator mode is `notOnPrimary` (#14332)

Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander committed Dec 5, 2023
1 parent 3e40419 commit 9d379c4
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 60 deletions.
138 changes: 81 additions & 57 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,64 +108,88 @@ func TestDisableConsolidator(t *testing.T) {
}

func TestConsolidatorReplicasOnly(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
defer revert()

// primary should not do query consolidation
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
require.NoError(t, err)
}()
type executeFn func(
query string, bindvars map[string]*querypb.BindVariable,
) (*sqltypes.Result, error)

testCases := []struct {
name string
getExecuteFn func(qc *framework.QueryClient) executeFn
totalConsolidationsTag string
}{
{
name: "Execute",
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.Execute },
totalConsolidationsTag: "Waits/Histograms/Consolidations/Count",
},
{
name: "StreamExecute",
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.StreamExecute },
totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count",
},
}

initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg3 sync.WaitGroup
wg3.Add(2)
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg.Done()
}()
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg.Done()
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
defer revert()

// primary should not do query consolidation
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg2.Done()
}()
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg2.Done()
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
require.NoError(t, err)
}()

initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
var wg3 sync.WaitGroup
wg3.Add(2)
go func() {
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
wg3.Done()
}()
go func() {
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
wg3.Done()
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
})
}
}

func TestQueryPlanCache(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {

if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil {
if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() {
return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback,
return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback,
func(callback StreamCallback) error {
dbConn, err := qre.getStreamConn()
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/stream_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package tabletserver
import (
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/sqltypes"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
Expand Down Expand Up @@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) {
// `callback`. A `leaderCallback` must also be supplied: this function must perform the actual
// query in the upstream MySQL server, yielding results into the modified callback that it receives
// as an argument.
func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
var (
inflight *streamInFlight
catchup []*sqltypes.Result
Expand Down Expand Up @@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri

// if we have a followChan, we're following up on a query that is already being served
if followChan != nil {
startTime := time.Now()
defer func() {
memchange := inflight.unfollow(followChan, sc.cleanup)
atomic.AddInt64(&sc.memory, memchange)
waitTimings.Record("StreamConsolidations", startTime)
}()

logStats.QuerySources |= tabletenv.QuerySourceConsolidator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string

go func(worker int) {
defer wg.Done()
exporter := servenv.NewExporter("ConsolidatorTest", "")
timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations")
logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation")
query, callback := generateCallback(worker)
start := time.Now()
err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error {
err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error {
cr := ct.results[worker]
cr.items = append(cr.items, result)
atomic.AddInt64(&cr.count, 1)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
ctx: ctx,
logStats: logStats,
tsv: tsv,
tabletType: target.GetTabletType(),
setting: connSetting,
}
return qre.Stream(callback)
Expand Down

0 comments on commit 9d379c4

Please sign in to comment.