Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffer_drain_concurrency not doing anything #15042

Merged
merged 3 commits into from
Jan 30, 2024

Conversation

rbranson
Copy link
Contributor

@rbranson rbranson commented Jan 26, 2024

Description

This is based on a PR from @arthurschreiber with some additional changes based on the review feedback from @vmg. #14545

From the previous PR:

As described in #11684, the --buffer_drain_concurrency CLI argument to vtgate does not actually do anything.

This pull request implements the logic required to make this flag actually do something. 😬 When the buffer is drained, we now spawn as many goroutines as specified by --buffer_drain_concurrency to drain the buffer in parallel.

I don't think introducing a new flag as described in #11684 actually makes sense - instead I propose we mention this in the v19 changelog that this flag is now doing something, and don't backport the change to any earlier releases.

Related Issue(s)

Checklist

  • "Backport to:" labels have been added if this change should be back-ported to release branches
  • If this change is to be back-ported to previous releases, a justification is included in the PR description
  • Tests were added or are not required
  • Did the new or modified tests pass consistently locally and on CI?
  • Documentation was added or is not required

Copy link
Contributor

vitess-bot bot commented Jan 26, 2024

Review Checklist

Hello reviewers! 👋 Please follow this checklist when reviewing this Pull Request.

General

  • Ensure that the Pull Request has a descriptive title.
  • Ensure there is a link to an issue (except for internal cleanup and flaky test fixes), new features should have an RFC that documents use cases and test cases.

Tests

  • Bug fixes should have at least one unit or end-to-end test, enhancement and new features should have a sufficient number of tests.

Documentation

  • Apply the release notes (needs details) label if users need to know about this change.
  • New features should be documented.
  • There should be some code comments as to why things are implemented the way they are.
  • There should be a comment at the top of each new or modified test to explain what the test does.

New flags

  • Is this flag really necessary?
  • Flag names must be clear and intuitive, use dashes (-), and have a clear help text.

If a workflow is added or modified:

  • Each item in Jobs should be named in order to mark it as required.
  • If the workflow needs to be marked as required, the maintainer team must be notified.

Backward compatibility

  • Protobuf changes should be wire-compatible.
  • Changes to _vt tables and RPCs need to be backward compatible.
  • RPC changes should be compatible with vitess-operator
  • If a flag is removed, then it should also be removed from vitess-operator and arewefastyet, if used there.
  • vtctl command output order should be stable and awk-able.

@vitess-bot vitess-bot bot added NeedsBackportReason If backport labels have been applied to a PR, a justification is required NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsIssue A linked issue is missing for this Pull Request NeedsWebsiteDocsUpdate What it says labels Jan 26, 2024
@github-actions github-actions bot added this to the v19.0.0 milestone Jan 26, 2024
Copy link

codecov bot commented Jan 26, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

❗ No coverage uploaded for pull request base (main@81777e5). Click here to learn what that means.

Additional details and impacted files
@@           Coverage Diff           @@
##             main   #15042   +/-   ##
=======================================
  Coverage        ?   47.69%           
=======================================
  Files           ?     1155           
  Lines           ?   240139           
  Branches        ?        0           
=======================================
  Hits            ?   114526           
  Misses          ?   117012           
  Partials        ?     8601           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@rbranson rbranson marked this pull request as ready for review January 26, 2024 01:56
@deepthi deepthi added Type: Bug Component: Query Serving and removed NeedsWebsiteDocsUpdate What it says NeedsIssue A linked issue is missing for this Pull Request NeedsBackportReason If backport labels have been applied to a PR, a justification is required labels Jan 26, 2024
go/vt/vtgate/buffer/buffer_test.go Outdated Show resolved Hide resolved
go/vt/vtgate/buffer/shard_buffer.go Outdated Show resolved Hide resolved
@deepthi deepthi requested review from ajm188 and removed request for systay and frouioui January 26, 2024 04:31
@rbranson
Copy link
Contributor Author

The previous implementation, while clever, had a HoLB issue where slow requests could result in a long tail wait at the end of the drain process because the division of labor among the workers is done equally.

This new implementation is much smaller and still avoids the overhead and extra logic of pumping a channel to distribute work. Under the non-concurrent case, the overhead of the atomic increment (versus a loop increment) is extremely negligible, like 10-20 cycles, so IMO it's not really worth special casing the non-concurrent case.

Additionally, the abstraction and unit test coverage on parallelRangeIndex gives me enough confidence in the way the actual work distribution is handled that I don't think we need another set of tests for various buffer sizes.

Copy link
Contributor

@mattlord mattlord left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Nice work on this @rbranson !

So far I only had some minor questions/nits/suggestions that are largely a matter of personal preference — so you can address those as you feel best. 🙂

The only requested change that I feel we really should make is this... We have a number of endtoend tests for the buffering here: https://github.com/vitessio/vitess/tree/main/go/test/endtoend/tabletgateway/buffer

We should set the buffer_drain_concurrency to 4 or 8 in at least some of those tests to add additional test coverage and verify the behavior. Please let me know if you see any issues with that or could use any help.


func TestBufferingConcurrent(t *testing.T) {
testAllImplementations(t, func(t *testing.T, fail failover) {
testBuffering1WithOptions(t, fail, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it would be better to use something like 8 here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disagree because the buffers in the unit tests aren't large enough to create an actual concurrent drain at 8 in many cases.

Copy link
Contributor

@mattlord mattlord Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why that matters but 2 is also fine (I was errantly thinking the default for the flag was 8 when I started the review). IF buffering fewer connections than the concurrency value is set to IS a problem -- then that's a problem we should address.

Comment on lines +835 to +837
var mu sync.Mutex
var wg sync.WaitGroup
var counter atomic.Int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extremely nitty, but we could use a single var block here.

Comment on lines +850 to +852
mu.Lock()
got = append(got, idx)
mu.Unlock()
Copy link
Contributor

@mattlord mattlord Jan 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also annoyingly nitty, but IMO it's nicer to use a closure here:

			appendVal := func(val int) {
				mu.Lock()
				defer mu.Unlock()
				got = append(got, val)
			}
			for i := 0; i < tc.concurrency; i++ {
				go func() {
					defer wg.Done()
					for {
						idx, ok := parallelRangeIndex(&counter, tc.max)
						if !ok {
							return
						}
						appendVal(idx)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

var counter atomic.Int64

wg.Add(tc.concurrency)
got := []int{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annoyingly nitty, but we can pre-allocate the space one time:

			got := make([]int, 0, len(tc.calls))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah our style guide recommends either an empty variable (var got []int) or explicitly allocating the slice. Either way is fine.

for {
idx, ok := parallelRangeIndex(&counter, tc.max)
if !ok {
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to return here?

Comment on lines +572 to +574
// if this is a 32-bit platform, max won't exceed the 32-bit integer limit
// so a cast from a too-large 64-bit int to a 32-bit int will never happen
return int(next) - 1, true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use int64 so that the behavior is deterministic / platform independent and we don't need to cast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slice indexes are already "int" (would have to be casted later) and the issue is mitigated internally in the function, which I think is better than forcing the caller to deal with it

for {
idx, ok := parallelRangeIndex(&rangeCounter, entryCount-1)
if !ok {
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break is exactly what I want here. I want the loop iteration to end. in this instance they are functionally equivalent, but "return" requires modifying the function to place any code after the loop.

Copy link
Contributor

@mattlord mattlord Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to put it, IF we want to exit the goroutine and return when parallelRangeIndex returns false, then IMO we should explicitly do so. That can always be changed if that's not what we want.

This is also an example of why comments are nice. Why would or wouldn't we want to return there? What might we want to do outside of the for loop? My reading of it was that we want to loop forever or until parallelIRangeIndex tells us to stop -- and that's the only purpose of the goroutine. So it caught my eye that we weren't explicitly ending the goroutine when it did.

@mattlord mattlord removed the NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work label Jan 27, 2024
@mattlord
Copy link
Contributor

Please note the current DCO failure as well: https://github.com/vitessio/vitess/pull/15042/checks?check_run_id=20915653435

Copy link
Collaborator

@vmg vmg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good @rbranson, thanks for contributing to Vitess ;)

var counter atomic.Int64

wg.Add(tc.concurrency)
got := []int{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah our style guide recommends either an empty variable (var got []int) or explicitly allocating the slice. Either way is fine.


for idx, tc := range suite {
name := fmt.Sprintf("%d_max%d_concurrency%d", idx, tc.max, tc.concurrency)
t.Run(name, func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small issue around variable shadowing here (for tc): https://stackoverflow.com/a/33459174

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajm188 not sure I follow. the tests are working properly. the t.Run call and goroutines below terminate before the next loop iteration.

Comment on lines +850 to +852
mu.Lock()
got = append(got, idx)
mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@rbranson rbranson force-pushed the arthur/fix-buffer-drain-concurrency branch from bdc573c to a7bb30f Compare January 29, 2024 17:59
@rbranson
Copy link
Contributor Author

Had to squash to fix the commit signing issue

@rbranson rbranson requested a review from mattlord January 29, 2024 19:28
Copy link
Contributor

@mattlord mattlord left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rbranson Thanks for adding the e2e test change! That was the only blocker for me.

Thanks again for the contribution! 😃

@mattlord mattlord merged commit c81a791 into vitessio:main Jan 30, 2024
102 checks passed
@rbranson rbranson deleted the arthur/fix-buffer-drain-concurrency branch January 30, 2024 23:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bug Report: buffer_drain_concurrency flag does nothing
7 participants