diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 0357932f070..ba6597dac69 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -421,6 +421,9 @@ Flags: --vtgate_grpc_server_name string the server name to use to validate server certificate --vttablet_skip_buildinfo_tags string comma-separated list of buildinfo tags to skip from merging with --init_tags. each tag is either an exact match or a regular expression of the form '/regexp/'. (default "/.*/") --wait_for_backup_interval duration (init restore parameter) if this is greater than 0, instead of starting up empty when no backups are found, keep checking at this interval for a backup to appear + --warming-reads-concurrency int Number of concurrent warming reads allowed (default 500) + --warming-reads-percent int Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm + --warming-reads-query-timeout duration Timeout of warming read queries (default 5s) --warn_memory_rows int Warning threshold for in-memory results. A row count higher than this amount will cause the VtGateWarnings.ResultsExceeded counter to be incremented. (default 30000) --warn_payload_size int The warning threshold for query payloads in bytes. A payload greater than this threshold will cause the VtGateWarnings.WarnPayloadSizeExceeded counter to be incremented. --warn_sharded_only If any features that are only available in unsharded mode are used, query execution warnings will be added to the session diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index a496b2765d2..1106b74e581 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -229,6 +229,9 @@ Flags: --vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging --vschema_ddl_authorized_users string List of users authorized to execute vschema ddl operations, or '%' to allow all users. --vtgate-config-terse-errors prevent bind vars from escaping in returned errors + --warming-reads-concurrency int Number of concurrent warming reads allowed (default 500) + --warming-reads-percent int Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm + --warming-reads-query-timeout duration Timeout of warming read queries (default 5s) --warn_memory_rows int Warning threshold for in-memory results. A row count higher than this amount will cause the VtGateWarnings.ResultsExceeded counter to be incremented. (default 30000) --warn_payload_size int The warning threshold for query payloads in bytes. A payload greater than this threshold will cause the VtGateWarnings.WarnPayloadSizeExceeded counter to be incremented. --warn_sharded_only If any features that are only available in unsharded mode are used, query execution warnings will be added to the session diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index bbeb99e0e36..8167c510b01 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -75,7 +75,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) - vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion) + vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) return nil diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 139223d4d09..79e7c7979c2 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -89,6 +89,18 @@ func (t *noopVCursor) ReleaseLock(context.Context) error { panic("implement me") } +func (t *noopVCursor) GetWarmingReadsPercent() int { + panic("implement me") +} + +func (t *noopVCursor) GetWarmingReadsChannel() chan bool { + panic("implement me") +} + +func (t *noopVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { + panic("implement me") +} + func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) error { panic("implement me") } @@ -481,6 +493,18 @@ func (f *loggingVCursor) RecordWarning(warning *querypb.QueryWarning) { f.warnings = append(f.warnings, warning) } +func (f *loggingVCursor) GetWarmingReadsPercent() int { + return 0 +} + +func (f *loggingVCursor) GetWarmingReadsChannel() chan bool { + return make(chan bool) +} + +func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { + return f +} + func (f *loggingVCursor) Execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { name := "Unknown" switch co { diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index b5d67c9d994..1721f1ea063 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -119,6 +119,15 @@ type ( // ReleaseLock releases all the held advisory locks. ReleaseLock(ctx context.Context) error + + // GetWarmingReadsPercent gets the percentage of queries to clone to replicas for bufferpool warming + GetWarmingReadsPercent() int + + // GetWarmingReadsChannel returns the channel for executing warming reads against replicas + GetWarmingReadsChannel() chan bool + + // CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas + CloneForReplicaWarming(ctx context.Context) VCursor } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 80c4f181830..1f806867b70 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -19,6 +19,7 @@ package engine import ( "context" "fmt" + "math/rand" "sort" "strconv" "strings" @@ -26,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/sqltypes" @@ -41,6 +43,13 @@ import ( var _ Primitive = (*Route)(nil) +var ( + replicaWarmingReadsMirrored = stats.NewCountersWithMultiLabels( + "ReplicaWarmingReadsMirrored", + "Number of reads mirrored to replicas to warm their bufferpools", + []string{"Keyspace"}) +) + // Route represents the instructions to route a read query to // one or many vttablets. type Route struct { @@ -240,6 +249,8 @@ func (route *Route) executeShards( queries := getQueries(route.Query, bvs) result, errs := vcursor.ExecuteMultiShard(ctx, route, rss, queries, false /* rollbackOnError */, false /* canAutocommit */) + route.executeWarmingReplicaRead(ctx, vcursor, bindVars, queries) + if errs != nil { errs = filterOutNilErrors(errs) if !route.ScatterErrorsAsWarnings || len(errs) == len(rss) { @@ -581,3 +592,42 @@ func getQueries(query string, bvs []map[string]*querypb.BindVariable) []*querypb func orderByToString(in any) string { return in.(OrderByParams).String() } + +func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, queries []*querypb.BoundQuery) { + switch route.Opcode { + case Unsharded, Scatter, Equal, EqualUnique, IN, MultiEqual: + // no-op + default: + return + } + + if vcursor.GetWarmingReadsPercent() == 0 || rand.Intn(100) > vcursor.GetWarmingReadsPercent() { + return + } + + replicaVCursor := vcursor.CloneForReplicaWarming(ctx) + warmingReadsChannel := vcursor.GetWarmingReadsChannel() + + select { + // if there's no more room in the channel, drop the warming read + case warmingReadsChannel <- true: + go func(replicaVCursor VCursor) { + defer func() { + <-warmingReadsChannel + }() + rss, _, err := route.findRoute(ctx, replicaVCursor, bindVars) + if err != nil { + return + } + + _, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false /* rollbackOnError */, false /* autocommit */) + if len(errs) > 0 { + log.Warningf("Failed to execute warming replica read: %v", errs) + } else { + replicaWarmingReadsMirrored.Add([]string{route.Keyspace.Name}, 1) + } + }(replicaVCursor) + default: + log.Warning("Failed to execute warming replica read as pool is full") + } +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 5b94183a950..8c06220b422 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -119,6 +119,9 @@ type Executor struct { // queryLogger is passed in for logging from this vtgate executor. queryLogger *streamlog.StreamLogger[*logstats.LogStats] + + warmingReadsPercent int + warmingReadsChannel chan bool } var executorOnce sync.Once @@ -148,20 +151,23 @@ func NewExecutor( schemaTracker SchemaInfo, noScatter bool, pv plancontext.PlannerVersion, + warmingReadsPercent int, ) *Executor { e := &Executor{ - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - normalize: normalize, - warnShardedOnly: warnOnShardedOnly, - streamSize: streamSize, - schemaTracker: schemaTracker, - allowScatter: !noScatter, - pv: pv, - plans: plans, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + normalize: normalize, + warnShardedOnly: warnOnShardedOnly, + streamSize: streamSize, + schemaTracker: schemaTracker, + allowScatter: !noScatter, + pv: pv, + plans: plans, + warmingReadsPercent: warmingReadsPercent, + warmingReadsChannel: make(chan bool, warmingReadsConcurrency), } vschemaacl.Init() diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 107215d6f4d..1da2cd10154 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -26,6 +26,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sidecardb" + "vitess.io/vitess/go/vt/vtgate/logstats" + + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/cache/theine" @@ -37,13 +44,9 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vtgate/logstats" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/sandboxconn" ) @@ -185,7 +188,7 @@ func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup // one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness. plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false) - executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) + executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) executor.SetQueryLogger(queryLogger) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} @@ -218,7 +221,7 @@ func createCustomExecutor(t testing.TB, vschema string) (executor *Executor, sbc queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) + executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -253,10 +256,9 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty sbcs = append(sbcs, sbc) } sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) - queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) + executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -268,6 +270,29 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty return executor, sbcs[0], sbcs[1], sbclookup, ctx } +func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context, warmingReadsPercent int) (executor *Executor, primary, replica *sandboxconn.SandboxConn) { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + cell := "aa" + hc := discovery.NewFakeHealthCheck(nil) + serv := newSandboxForCells(ctx, []string{cell}) + resolver := newTestResolver(ctx, hc, serv, cell) + + createSandbox(KsTestUnsharded) + primary = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) + replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + + queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) + executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent) + executor.SetQueryLogger(queryLogger) + + t.Cleanup(func() { + executor.Close() + cancel() + }) + return executor, primary, replica +} + func executorExecSession(ctx context.Context, executor *Executor, sql string, bv map[string]*querypb.BindVariable, session *vtgatepb.Session) (*sqltypes.Result, error) { return executor.Execute( ctx, diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 829cd844fd1..a0da617c883 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1561,7 +1561,7 @@ func TestStreamSelectIN(t *testing.T) { func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) + ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) ex.SetQueryLogger(queryLogger) return ex } @@ -3185,10 +3185,9 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { }) count++ } - queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) + executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start @@ -4152,6 +4151,70 @@ func TestSelectView(t *testing.T) { utils.MustMatch(t, wantQueries, sbc.Queries) } +func TestWarmingReads(t *testing.T) { + ctx := utils.LeakCheckContext(t) + executor, primary, replica := createExecutorEnvWithPrimaryReplicaConn(t, ctx, 100) + + executor.normalize = true + session := NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}) + + _, err := executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user", map[string]*querypb.BindVariable{}) + time.Sleep(10 * time.Millisecond) + require.NoError(t, err) + wantQueries := []*querypb.BoundQuery{ + {Sql: "select age, city from `user`"}, + } + utils.MustMatch(t, wantQueries, primary.Queries) + primary.Queries = nil + + wantQueriesReplica := []*querypb.BoundQuery{ + {Sql: "select age, city from `user`/* warming read */"}, + } + utils.MustMatch(t, wantQueriesReplica, replica.Queries) + replica.Queries = nil + + _, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user /* already has a comment */ ", map[string]*querypb.BindVariable{}) + time.Sleep(10 * time.Millisecond) + require.NoError(t, err) + wantQueries = []*querypb.BoundQuery{ + {Sql: "select age, city from `user` /* already has a comment */"}, + } + utils.MustMatch(t, wantQueries, primary.Queries) + primary.Queries = nil + + wantQueriesReplica = []*querypb.BoundQuery{ + {Sql: "select age, city from `user` /* already has a comment *//* warming read */"}, + } + utils.MustMatch(t, wantQueriesReplica, replica.Queries) + replica.Queries = nil + + _, err = executor.Execute(ctx, nil, "TestSelect", session, "insert into user (age, city) values (5, 'Boston')", map[string]*querypb.BindVariable{}) + time.Sleep(10 * time.Millisecond) + require.NoError(t, err) + require.Nil(t, replica.Queries) + + _, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "update user set age=5 where city='Boston'", map[string]*querypb.BindVariable{}) + time.Sleep(10 * time.Millisecond) + require.NoError(t, err) + require.Nil(t, replica.Queries) + + _, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "delete from user where city='Boston'", map[string]*querypb.BindVariable{}) + time.Sleep(10 * time.Millisecond) + require.NoError(t, err) + require.Nil(t, replica.Queries) + primary.Queries = nil + + executor, primary, replica = createExecutorEnvWithPrimaryReplicaConn(t, ctx, 0) + _, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user", map[string]*querypb.BindVariable{}) + time.Sleep(10 * time.Millisecond) + require.NoError(t, err) + wantQueries = []*querypb.BoundQuery{ + {Sql: "select age, city from `user`"}, + } + utils.MustMatch(t, wantQueries, primary.Queries) + require.Nil(t, replica.Queries) +} + func TestMain(m *testing.M) { _flag.ParseFlagsForTest() os.Exit(m.Run()) diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index e5c730eb157..5ef00fd0691 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -68,7 +68,7 @@ func TestStreamSQLSharded(t *testing.T) { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) + executor := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) executor.SetQueryLogger(queryLogger) defer executor.Close() diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 8f65884dba3..d4c48a990a2 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -113,6 +113,9 @@ type vcursorImpl struct { warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here pv plancontext.PlannerVersion + + warmingReadsPercent int + warmingReadsChannel chan bool } // newVcursorImpl creates a vcursorImpl. Before creating this object, you have to separate out any marginComments that came with @@ -157,21 +160,29 @@ func newVCursorImpl( connCollation = collations.Default() } + warmingReadsPct := 0 + var warmingReadsChan chan bool + if executor != nil { + warmingReadsPct = executor.warmingReadsPercent + warmingReadsChan = executor.warmingReadsChannel + } return &vcursorImpl{ - safeSession: safeSession, - keyspace: keyspace, - tabletType: tabletType, - destination: destination, - marginComments: marginComments, - executor: executor, - logStats: logStats, - collation: connCollation, - resolver: resolver, - vschema: vschema, - vm: vm, - topoServer: ts, - warnShardedOnly: warnShardedOnly, - pv: pv, + safeSession: safeSession, + keyspace: keyspace, + tabletType: tabletType, + destination: destination, + marginComments: marginComments, + executor: executor, + logStats: logStats, + collation: connCollation, + resolver: resolver, + vschema: vschema, + vm: vm, + topoServer: ts, + warnShardedOnly: warnShardedOnly, + pv: pv, + warmingReadsPercent: warmingReadsPct, + warmingReadsChannel: warmingReadsChan, }, nil } @@ -1268,3 +1279,43 @@ func (vc *vcursorImpl) StorePrepareData(stmtName string, prepareData *vtgatepb.P func (vc *vcursorImpl) GetPrepareData(stmtName string) *vtgatepb.PrepareData { return vc.safeSession.GetPrepareData(stmtName) } + +func (vc *vcursorImpl) GetWarmingReadsPercent() int { + return vc.warmingReadsPercent +} + +func (vc *vcursorImpl) GetWarmingReadsChannel() chan bool { + return vc.warmingReadsChannel +} + +func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCursor { + callerId := callerid.EffectiveCallerIDFromContext(ctx) + immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx) + + timedCtx, _ := context.WithTimeout(context.Background(), warmingReadsQueryTimeout) //nolint + clonedCtx := callerid.NewContext(timedCtx, callerId, immediateCallerId) + + v := &vcursorImpl{ + safeSession: NewAutocommitSession(vc.safeSession.Session), + keyspace: vc.keyspace, + tabletType: topodatapb.TabletType_REPLICA, + destination: vc.destination, + marginComments: vc.marginComments, + executor: vc.executor, + resolver: vc.resolver, + topoServer: vc.topoServer, + logStats: &logstats.LogStats{Ctx: clonedCtx}, + collation: vc.collation, + ignoreMaxMemoryRows: vc.ignoreMaxMemoryRows, + vschema: vc.vschema, + vm: vc.vm, + semTable: vc.semTable, + warnShardedOnly: vc.warnShardedOnly, + warnings: vc.warnings, + pv: vc.pv, + } + + v.marginComments.Trailing += "/* warming read */" + + return v +} diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 8d8cd2885a3..b66ea93226e 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -111,6 +111,10 @@ var ( // allowKillStmt to allow execution of kill statement. allowKillStmt bool + + warmingReadsPercent = 0 + warmingReadsQueryTimeout = 5 * time.Second + warmingReadsConcurrency = 500 ) func registerFlags(fs *pflag.FlagSet) { @@ -144,6 +148,9 @@ func registerFlags(fs *pflag.FlagSet) { fs.DurationVar(&messageStreamGracePeriod, "message_stream_grace_period", messageStreamGracePeriod, "the amount of time to give for a vttablet to resume if it ends a message stream, usually because of a reparent.") fs.BoolVar(&enableViews, "enable-views", enableViews, "Enable views support in vtgate.") fs.BoolVar(&allowKillStmt, "allow-kill-statement", allowKillStmt, "Allows the execution of kill statement") + fs.IntVar(&warmingReadsPercent, "warming-reads-percent", 0, "Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm") + fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed") + fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries") _ = fs.String("schema_change_signal_user", "", "User to be used to send down query to vttablet to retrieve schema changes") _ = fs.MarkDeprecated("schema_change_signal_user", "schema tracking uses an internal api and does not require a user to be specified") @@ -319,6 +326,7 @@ func Init( si, noScatter, pv, + warmingReadsPercent, ) if err := executor.defaultQueryLogger(); err != nil {