-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix buffer_drain_concurrency
not doing anything
#15042
Fix buffer_drain_concurrency
not doing anything
#15042
Conversation
Signed-off-by: Arthur Schreiber <[email protected]>
Review ChecklistHello reviewers! 👋 Please follow this checklist when reviewing this Pull Request. General
Tests
Documentation
New flags
If a workflow is added or modified:
Backward compatibility
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #15042 +/- ##
=======================================
Coverage ? 47.69%
=======================================
Files ? 1155
Lines ? 240139
Branches ? 0
=======================================
Hits ? 114526
Misses ? 117012
Partials ? 8601 ☔ View full report in Codecov by Sentry. |
The previous implementation, while clever, had a HoLB issue where slow requests could result in a long tail wait at the end of the drain process because the division of labor among the workers is done equally. This new implementation is much smaller and still avoids the overhead and extra logic of pumping a channel to distribute work. Under the non-concurrent case, the overhead of the atomic increment (versus a loop increment) is extremely negligible, like 10-20 cycles, so IMO it's not really worth special casing the non-concurrent case. Additionally, the abstraction and unit test coverage on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Nice work on this @rbranson !
So far I only had some minor questions/nits/suggestions that are largely a matter of personal preference — so you can address those as you feel best. 🙂
The only requested change that I feel we really should make is this... We have a number of endtoend
tests for the buffering here: https://github.com/vitessio/vitess/tree/main/go/test/endtoend/tabletgateway/buffer
We should set the buffer_drain_concurrency
to 4 or 8 in at least some of those tests to add additional test coverage and verify the behavior. Please let me know if you see any issues with that or could use any help.
|
||
func TestBufferingConcurrent(t *testing.T) { | ||
testAllImplementations(t, func(t *testing.T, fail failover) { | ||
testBuffering1WithOptions(t, fail, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it would be better to use something like 8 here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disagree because the buffers in the unit tests aren't large enough to create an actual concurrent drain at 8 in many cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why that matters but 2 is also fine (I was errantly thinking the default for the flag was 8 when I started the review). IF buffering fewer connections than the concurrency value is set to IS a problem -- then that's a problem we should address.
var mu sync.Mutex | ||
var wg sync.WaitGroup | ||
var counter atomic.Int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extremely nitty, but we could use a single var block here.
mu.Lock() | ||
got = append(got, idx) | ||
mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also annoyingly nitty, but IMO it's nicer to use a closure here:
appendVal := func(val int) {
mu.Lock()
defer mu.Unlock()
got = append(got, val)
}
for i := 0; i < tc.concurrency; i++ {
go func() {
defer wg.Done()
for {
idx, ok := parallelRangeIndex(&counter, tc.max)
if !ok {
return
}
appendVal(idx)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
go/vt/vtgate/buffer/buffer_test.go
Outdated
var counter atomic.Int64 | ||
|
||
wg.Add(tc.concurrency) | ||
got := []int{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annoyingly nitty, but we can pre-allocate the space one time:
got := make([]int, 0, len(tc.calls))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah our style guide recommends either an empty variable (var got []int
) or explicitly allocating the slice. Either way is fine.
for { | ||
idx, ok := parallelRangeIndex(&counter, tc.max) | ||
if !ok { | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to return here?
// if this is a 32-bit platform, max won't exceed the 32-bit integer limit | ||
// so a cast from a too-large 64-bit int to a 32-bit int will never happen | ||
return int(next) - 1, true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to use int64 so that the behavior is deterministic / platform independent and we don't need to cast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slice indexes are already "int" (would have to be casted later) and the issue is mitigated internally in the function, which I think is better than forcing the caller to deal with it
for { | ||
idx, ok := parallelRangeIndex(&rangeCounter, entryCount-1) | ||
if !ok { | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to return here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break is exactly what I want here. I want the loop iteration to end. in this instance they are functionally equivalent, but "return" requires modifying the function to place any code after the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to put it, IF we want to exit the goroutine and return when parallelRangeIndex returns false, then IMO we should explicitly do so. That can always be changed if that's not what we want.
This is also an example of why comments are nice. Why would or wouldn't we want to return there? What might we want to do outside of the for loop? My reading of it was that we want to loop forever or until parallelIRangeIndex tells us to stop -- and that's the only purpose of the goroutine. So it caught my eye that we weren't explicitly ending the goroutine when it did.
Please note the current DCO failure as well: https://github.com/vitessio/vitess/pull/15042/checks?check_run_id=20915653435 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good @rbranson, thanks for contributing to Vitess ;)
go/vt/vtgate/buffer/buffer_test.go
Outdated
var counter atomic.Int64 | ||
|
||
wg.Add(tc.concurrency) | ||
got := []int{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah our style guide recommends either an empty variable (var got []int
) or explicitly allocating the slice. Either way is fine.
|
||
for idx, tc := range suite { | ||
name := fmt.Sprintf("%d_max%d_concurrency%d", idx, tc.max, tc.concurrency) | ||
t.Run(name, func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small issue around variable shadowing here (for tc
): https://stackoverflow.com/a/33459174
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajm188 not sure I follow. the tests are working properly. the t.Run call and goroutines below terminate before the next loop iteration.
mu.Lock() | ||
got = append(got, idx) | ||
mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Signed-off-by: Rick Branson <[email protected]>
bdc573c
to
a7bb30f
Compare
Had to squash to fix the commit signing issue |
Signed-off-by: Rick Branson <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rbranson Thanks for adding the e2e test change! That was the only blocker for me.
Thanks again for the contribution! 😃
Description
This is based on a PR from @arthurschreiber with some additional changes based on the review feedback from @vmg. #14545
From the previous PR:
Related Issue(s)
Checklist