-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add option for warming reads to mirror primary read queries onto replicas from vtgates to warm bufferpools #13206
Merged
Merged
Changes from 14 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
063658c
add option for warming reads to mirror primary read queries onto repl…
9d28448
first pass of addressing pr comments
deb93d1
remove accidentally committed file
1d3922a
add bounded pool for warming reads, metrics, query timeout
7a3b44d
defer draining pool after warming read
d3b09e2
fix tests
ad6a796
add type to flag
a1bd89e
fix linting
08adde1
simplify test setup
9e487ef
do not cancel context
d15adc7
bump to 500 pool size
c8576ca
fix flags
a036088
Apply suggestions from code review
olyazavr 5892603
add flags to vtcombo and omit defaults in description
96067a1
rename warming reads pool to channel, use concurrency for flag name
dd0444c
rename warming reads pool variable and comment
498d1dc
remove default 0 from flags
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -1561,7 +1561,7 @@ func TestStreamSelectIN(t *testing.T) { | |||||
func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor { | ||||||
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) | ||||||
plans := DefaultPlanCache() | ||||||
ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) | ||||||
ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) | ||||||
ex.SetQueryLogger(queryLogger) | ||||||
return ex | ||||||
} | ||||||
|
@@ -3185,10 +3185,9 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { | |||||
}) | ||||||
count++ | ||||||
} | ||||||
|
||||||
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) | ||||||
plans := DefaultPlanCache() | ||||||
executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4) | ||||||
executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) | ||||||
executor.SetQueryLogger(queryLogger) | ||||||
defer executor.Close() | ||||||
// some sleep for all goroutines to start | ||||||
|
@@ -4152,6 +4151,70 @@ func TestSelectView(t *testing.T) { | |||||
utils.MustMatch(t, wantQueries, sbc.Queries) | ||||||
} | ||||||
|
||||||
func TestWarmingReads(t *testing.T) { | ||||||
ctx := utils.LeakCheckContext(t) | ||||||
executor, primary, replica := createExecutorEnvWithPrimaryReplicaConn(t, ctx, 100) | ||||||
|
||||||
executor.normalize = true | ||||||
session := NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}) | ||||||
|
||||||
_, err := executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user", map[string]*querypb.BindVariable{}) | ||||||
time.Sleep(10 * time.Millisecond) | ||||||
require.NoError(t, err) | ||||||
wantQueries := []*querypb.BoundQuery{ | ||||||
{Sql: "select age, city from `user`"}, | ||||||
} | ||||||
utils.MustMatch(t, wantQueries, primary.Queries) | ||||||
primary.Queries = nil | ||||||
|
||||||
wantQueriesReplica := []*querypb.BoundQuery{ | ||||||
{Sql: "select age, city from `user`/* warming read */"}, | ||||||
} | ||||||
utils.MustMatch(t, wantQueriesReplica, replica.Queries) | ||||||
replica.Queries = nil | ||||||
|
||||||
_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user /* already has a comment */ ", map[string]*querypb.BindVariable{}) | ||||||
time.Sleep(10 * time.Millisecond) | ||||||
require.NoError(t, err) | ||||||
wantQueries = []*querypb.BoundQuery{ | ||||||
{Sql: "select age, city from `user` /* already has a comment */"}, | ||||||
} | ||||||
utils.MustMatch(t, wantQueries, primary.Queries) | ||||||
primary.Queries = nil | ||||||
|
||||||
wantQueriesReplica = []*querypb.BoundQuery{ | ||||||
{Sql: "select age, city from `user` /* already has a comment *//* warming read */"}, | ||||||
} | ||||||
utils.MustMatch(t, wantQueriesReplica, replica.Queries) | ||||||
replica.Queries = nil | ||||||
|
||||||
_, err = executor.Execute(ctx, nil, "TestSelect", session, "insert into user (age, city) values (5, 'Boston')", map[string]*querypb.BindVariable{}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
time.Sleep(10 * time.Millisecond) | ||||||
require.NoError(t, err) | ||||||
require.Nil(t, replica.Queries) | ||||||
|
||||||
_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "update user set age=5 where city='Boston'", map[string]*querypb.BindVariable{}) | ||||||
time.Sleep(10 * time.Millisecond) | ||||||
require.NoError(t, err) | ||||||
require.Nil(t, replica.Queries) | ||||||
|
||||||
_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "delete from user where city='Boston'", map[string]*querypb.BindVariable{}) | ||||||
time.Sleep(10 * time.Millisecond) | ||||||
require.NoError(t, err) | ||||||
require.Nil(t, replica.Queries) | ||||||
primary.Queries = nil | ||||||
|
||||||
executor, primary, replica = createExecutorEnvWithPrimaryReplicaConn(t, ctx, 0) | ||||||
_, err = executor.Execute(ctx, nil, "TestWarmingReads", session, "select age, city from user", map[string]*querypb.BindVariable{}) | ||||||
time.Sleep(10 * time.Millisecond) | ||||||
require.NoError(t, err) | ||||||
wantQueries = []*querypb.BoundQuery{ | ||||||
{Sql: "select age, city from `user`"}, | ||||||
} | ||||||
utils.MustMatch(t, wantQueries, primary.Queries) | ||||||
require.Nil(t, replica.Queries) | ||||||
} | ||||||
|
||||||
func TestMain(m *testing.M) { | ||||||
_flag.ParseFlagsForTest() | ||||||
os.Exit(m.Run()) | ||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Vitess, pools usually mean connection pools. When I read the description I thought it's a waitGroup or something like that. Actually it turns out to be a
channel bool
whose capacity is set by this flag.The flag name and description are misleading. They need to be changed to reflect the actual usage. It should be something like
--warming-reads-concurrency
and be documented asNumber of concurrent warming reads allowed
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I edited the PR description at the top to list all 3 flags. Once these changes are made, that needs to change again.