Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-17.0] tabletserver: do not consolidate streams on primary tablet when consolidator mode is notOnPrimary (#14332) #14678

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 81 additions & 57 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,64 +116,88 @@ func TestDisableConsolidator(t *testing.T) {
}

func TestConsolidatorReplicasOnly(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
defer revert()

// primary should not do query consolidation
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
require.NoError(t, err)
}()
type executeFn func(
query string, bindvars map[string]*querypb.BindVariable,
) (*sqltypes.Result, error)

testCases := []struct {
name string
getExecuteFn func(qc *framework.QueryClient) executeFn
totalConsolidationsTag string
}{
{
name: "Execute",
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.Execute },
totalConsolidationsTag: "Waits/Histograms/Consolidations/Count",
},
{
name: "StreamExecute",
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.StreamExecute },
totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count",
},
}

initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg3 sync.WaitGroup
wg3.Add(2)
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg.Done()
}()
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg.Done()
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
defer revert()

// primary should not do query consolidation
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg2.Done()
}()
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg2.Done()
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
require.NoError(t, err)
}()

initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
var wg3 sync.WaitGroup
wg3.Add(2)
go func() {
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
wg3.Done()
}()
go func() {
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
wg3.Done()
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
})
}
}

func TestQueryPlanCache(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {

if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil {
if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() {
return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback,
return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback,
func(callback StreamCallback) error {
dbConn, err := qre.getStreamConn()
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/stream_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package tabletserver
import (
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/sqltypes"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
Expand Down Expand Up @@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) {
// `callback`. A `leaderCallback` must also be supplied: this function must perform the actual
// query in the upstream MySQL server, yielding results into the modified callback that it receives
// as an argument.
func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
var (
inflight *streamInFlight
catchup []*sqltypes.Result
Expand Down Expand Up @@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri

// if we have a followChan, we're following up on a query that is already being served
if followChan != nil {
startTime := time.Now()
defer func() {
memchange := inflight.unfollow(followChan, sc.cleanup)
atomic.AddInt64(&sc.memory, memchange)
waitTimings.Record("StreamConsolidations", startTime)
}()

logStats.QuerySources |= tabletenv.QuerySourceConsolidator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string

go func(worker int) {
defer wg.Done()
exporter := servenv.NewExporter("ConsolidatorTest", "")
timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations")
logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation")
query, callback := generateCallback(worker)
start := time.Now()
err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error {
err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error {
cr := ct.results[worker]
cr.items = append(cr.items, result)
atomic.AddInt64(&cr.count, 1)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
ctx: ctx,
logStats: logStats,
tsv: tsv,
tabletType: target.GetTabletType(),
setting: connSetting,
}
return qre.Stream(callback)
Expand Down
Loading