Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option for warming reads to mirror primary read queries onto replicas from vtgates to warm bufferpools #13206

Merged
merged 17 commits into from
Oct 4, 2023
Merged
3 changes: 3 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,9 @@ Flags:
--vtgate_grpc_server_name string the server name to use to validate server certificate
--vttablet_skip_buildinfo_tags string comma-separated list of buildinfo tags to skip from merging with --init_tags. each tag is either an exact match or a regular expression of the form '/regexp/'. (default "/.*/")
--wait_for_backup_interval duration (init restore parameter) if this is greater than 0, instead of starting up empty when no backups are found, keep checking at this interval for a backup to appear
--warming-reads-concurrency int Number of concurrent warming reads allowed (default 500)
--warming-reads-percent int Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm
--warming-reads-query-timeout duration Timeout of warming read queries (default 5s)
--warn_memory_rows int Warning threshold for in-memory results. A row count higher than this amount will cause the VtGateWarnings.ResultsExceeded counter to be incremented. (default 30000)
--warn_payload_size int The warning threshold for query payloads in bytes. A payload greater than this threshold will cause the VtGateWarnings.WarnPayloadSizeExceeded counter to be incremented.
--warn_sharded_only If any features that are only available in unsharded mode are used, query execution warnings will be added to the session
Expand Down
3 changes: 3 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ Flags:
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
--vschema_ddl_authorized_users string List of users authorized to execute vschema ddl operations, or '%' to allow all users.
--vtgate-config-terse-errors prevent bind vars from escaping in returned errors
--warming-reads-concurrency int Number of concurrent warming reads allowed (default 500)
--warming-reads-percent int Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm
--warming-reads-query-timeout duration Timeout of warming read queries (default 5s)
--warn_memory_rows int Warning threshold for in-memory results. A row count higher than this amount will cause the VtGateWarnings.ResultsExceeded counter to be incremented. (default 30000)
--warn_payload_size int The warning threshold for query payloads in bytes. A payload greater than this threshold will cause the VtGateWarnings.WarnPayloadSizeExceeded counter to be incremented.
--warn_sharded_only If any features that are only available in unsharded mode are used, query execution warnings will be added to the session
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
queryLogBufferSize := 10
plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0)
vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))

return nil
Expand Down
24 changes: 24 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ func (t *noopVCursor) ReleaseLock(context.Context) error {
panic("implement me")
}

func (t *noopVCursor) GetWarmingReadsPercent() int {
panic("implement me")
}

func (t *noopVCursor) GetWarmingReadsChannel() chan bool {
panic("implement me")
}

func (t *noopVCursor) CloneForReplicaWarming(ctx context.Context) VCursor {
panic("implement me")
}

func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) error {
panic("implement me")
}
Expand Down Expand Up @@ -481,6 +493,18 @@ func (f *loggingVCursor) RecordWarning(warning *querypb.QueryWarning) {
f.warnings = append(f.warnings, warning)
}

func (f *loggingVCursor) GetWarmingReadsPercent() int {
return 0
}

func (f *loggingVCursor) GetWarmingReadsChannel() chan bool {
return make(chan bool)
}

func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor {
return f
}

func (f *loggingVCursor) Execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) {
name := "Unknown"
switch co {
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ type (

// ReleaseLock releases all the held advisory locks.
ReleaseLock(ctx context.Context) error

// GetWarmingReadsPercent gets the percentage of queries to clone to replicas for bufferpool warming
GetWarmingReadsPercent() int

// GetWarmingReadsChannel returns the channel for executing warming reads against replicas
GetWarmingReadsChannel() chan bool

// CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas
CloneForReplicaWarming(ctx context.Context) VCursor
}

// SessionActions gives primitives ability to interact with the session state
Expand Down
50 changes: 50 additions & 0 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package engine
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/sqltypes"
Expand All @@ -41,6 +43,13 @@ import (

var _ Primitive = (*Route)(nil)

var (
replicaWarmingReadsMirrored = stats.NewCountersWithMultiLabels(
"ReplicaWarmingReadsMirrored",
"Number of reads mirrored to replicas to warm their bufferpools",
[]string{"Keyspace"})
)

// Route represents the instructions to route a read query to
// one or many vttablets.
type Route struct {
Expand Down Expand Up @@ -240,6 +249,8 @@ func (route *Route) executeShards(
queries := getQueries(route.Query, bvs)
result, errs := vcursor.ExecuteMultiShard(ctx, route, rss, queries, false /* rollbackOnError */, false /* canAutocommit */)

route.executeWarmingReplicaRead(ctx, vcursor, bindVars, queries)

if errs != nil {
errs = filterOutNilErrors(errs)
if !route.ScatterErrorsAsWarnings || len(errs) == len(rss) {
Expand Down Expand Up @@ -581,3 +592,42 @@ func getQueries(query string, bvs []map[string]*querypb.BindVariable) []*querypb
func orderByToString(in any) string {
return in.(OrderByParams).String()
}

func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, queries []*querypb.BoundQuery) {
switch route.Opcode {
case Unsharded, Scatter, Equal, EqualUnique, IN, MultiEqual:
// no-op
default:
return
}

if vcursor.GetWarmingReadsPercent() == 0 || rand.Intn(100) > vcursor.GetWarmingReadsPercent() {
return
}

replicaVCursor := vcursor.CloneForReplicaWarming(ctx)
warmingReadsChannel := vcursor.GetWarmingReadsChannel()

select {
// if there's no more room in the channel, drop the warming read
case warmingReadsChannel <- true:
go func(replicaVCursor VCursor) {
defer func() {
<-warmingReadsChannel
}()
rss, _, err := route.findRoute(ctx, replicaVCursor, bindVars)
if err != nil {
return
}

_, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false /* rollbackOnError */, false /* autocommit */)
if len(errs) > 0 {
log.Warningf("Failed to execute warming replica read: %v", errs)
} else {
replicaWarmingReadsMirrored.Add([]string{route.Keyspace.Name}, 1)
}
}(replicaVCursor)
default:
log.Warning("Failed to execute warming replica read as pool is full")
}
}
30 changes: 18 additions & 12 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type Executor struct {

// queryLogger is passed in for logging from this vtgate executor.
queryLogger *streamlog.StreamLogger[*logstats.LogStats]

warmingReadsPercent int
warmingReadsChannel chan bool
}

var executorOnce sync.Once
Expand Down Expand Up @@ -148,20 +151,23 @@ func NewExecutor(
schemaTracker SchemaInfo,
noScatter bool,
pv plancontext.PlannerVersion,
warmingReadsPercent int,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
plans: plans,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
plans: plans,
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
}

vschemaacl.Init()
Expand Down
41 changes: 33 additions & 8 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/vtgate/logstats"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cache/theine"
Expand All @@ -37,13 +44,9 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"
)
Expand Down Expand Up @@ -185,7 +188,7 @@ func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup
// one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness.
plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false)

executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4)
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor.SetQueryLogger(queryLogger)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
Expand Down Expand Up @@ -218,7 +221,7 @@ func createCustomExecutor(t testing.TB, vschema string) (executor *Executor, sbc

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4)
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down Expand Up @@ -253,10 +256,9 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty
sbcs = append(sbcs, sbc)
}
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4)
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand All @@ -268,6 +270,29 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty
return executor, sbcs[0], sbcs[1], sbclookup, ctx
}

func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context, warmingReadsPercent int) (executor *Executor, primary, replica *sandboxconn.SandboxConn) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
serv := newSandboxForCells(ctx, []string{cell})
resolver := newTestResolver(ctx, hc, serv, cell)

createSandbox(KsTestUnsharded)
primary = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil)

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
executor.Close()
cancel()
})
return executor, primary, replica
}

func executorExecSession(ctx context.Context, executor *Executor, sql string, bv map[string]*querypb.BindVariable, session *vtgatepb.Session) (*sqltypes.Result, error) {
return executor.Execute(
ctx,
Expand Down
69 changes: 66 additions & 3 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,7 +1561,7 @@ func TestStreamSelectIN(t *testing.T) {
func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4)
ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
ex.SetQueryLogger(queryLogger)
return ex
}
Expand Down Expand Up @@ -3185,10 +3185,9 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
})
count++
}

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4)
executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor.SetQueryLogger(queryLogger)
defer executor.Close()
// some sleep for all goroutines to start
Expand Down Expand Up @@ -4152,6 +4151,70 @@ func TestSelectView(t *testing.T) {
utils.MustMatch(t, wantQueries, sbc.Queries)
}

func TestWarmingReads(t *testing.T) {
ctx := utils.LeakCheckContext(t)
executor, primary, replica := createExecutorEnvWithPrimaryReplicaConn(t, ctx, 100)

executor.normalize = true
session := NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded})

_, err := executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user", map[string]*querypb.BindVariable{})
time.Sleep(10 * time.Millisecond)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{Sql: "select age, city from `user`"},
}
utils.MustMatch(t, wantQueries, primary.Queries)
primary.Queries = nil

wantQueriesReplica := []*querypb.BoundQuery{
{Sql: "select age, city from `user`/* warming read */"},
}
utils.MustMatch(t, wantQueriesReplica, replica.Queries)
replica.Queries = nil

_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user /* already has a comment */ ", map[string]*querypb.BindVariable{})
time.Sleep(10 * time.Millisecond)
require.NoError(t, err)
wantQueries = []*querypb.BoundQuery{
{Sql: "select age, city from `user` /* already has a comment */"},
}
utils.MustMatch(t, wantQueries, primary.Queries)
primary.Queries = nil

wantQueriesReplica = []*querypb.BoundQuery{
{Sql: "select age, city from `user` /* already has a comment *//* warming read */"},
}
utils.MustMatch(t, wantQueriesReplica, replica.Queries)
replica.Queries = nil

_, err = executor.Execute(ctx, nil, "TestSelect", session, "insert into user (age, city) values (5, 'Boston')", map[string]*querypb.BindVariable{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_, err = executor.Execute(ctx, nil, "TestSelect", session, "insert into user (age, city) values (5, 'Boston')", map[string]*querypb.BindVariable{})
_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "insert into user (age, city) values (5, 'Boston')", map[string]*querypb.BindVariable{})

time.Sleep(10 * time.Millisecond)
require.NoError(t, err)
require.Nil(t, replica.Queries)

_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "update user set age=5 where city='Boston'", map[string]*querypb.BindVariable{})
time.Sleep(10 * time.Millisecond)
require.NoError(t, err)
require.Nil(t, replica.Queries)

_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "delete from user where city='Boston'", map[string]*querypb.BindVariable{})
time.Sleep(10 * time.Millisecond)
require.NoError(t, err)
require.Nil(t, replica.Queries)
primary.Queries = nil

executor, primary, replica = createExecutorEnvWithPrimaryReplicaConn(t, ctx, 0)
_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user", map[string]*querypb.BindVariable{})
time.Sleep(10 * time.Millisecond)
require.NoError(t, err)
wantQueries = []*querypb.BoundQuery{
{Sql: "select age, city from `user`"},
}
utils.MustMatch(t, wantQueries, primary.Queries)
require.Nil(t, replica.Queries)
}

func TestMain(m *testing.M) {
_flag.ParseFlagsForTest()
os.Exit(m.Run())
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestStreamSQLSharded(t *testing.T) {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()

executor := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4)
executor := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor.SetQueryLogger(queryLogger)

defer executor.Close()
Expand Down
Loading
Loading