-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: dynamically adjusting the max write speed of reorganization job #57611
base: master
Are you sure you want to change the base?
Conversation
Hi @fzzf678. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/ok-to-test |
/test all |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #57611 +/- ##
================================================
+ Coverage 72.7907% 74.1107% +1.3199%
================================================
Files 1676 1721 +45
Lines 463750 471832 +8082
================================================
+ Hits 337567 349678 +12111
+ Misses 105323 99849 -5474
- Partials 20860 22305 +1445
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@@ -75,8 +75,9 @@ type DDLReorgMeta struct { | |||
// These two variables are used to control the concurrency and batch size of the reorganization process. | |||
// They can be adjusted dynamically through `admin alter ddl jobs` command. | |||
// Note: Don't get or set these two variables directly, use the functions instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make these 3 var private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will try make these vars private in next PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: CbcWestwolf, winoros The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
Co-authored-by: CbcWestwolf <[email protected]>
/cc @lance6716, please help take a look |
pkg/meta/model/reorg.go
Outdated
// GetMaxWriteSpeedOrDefault gets the max write speed from DDLReorgMeta. | ||
// 0 means no limit. | ||
func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault(defaultVal int) int { | ||
if dm == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be handled in where it's unmarshalled in a unified way, not check in every method
pkg/meta/model/reorg.go
Outdated
if dm == nil { | ||
return defaultVal | ||
} | ||
return int(atomic.LoadInt64(&dm.MaxWriteSpeed)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use atomic.XXX type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the reason is atomic.XXX
does not implement (un)marshal interface, so we must store the plain type in struct and use helper functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
golang's doesn't, but uber's does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use the atomic.XXX type, seems these three fields wiil meet unmarshal error when the job meta is from old version(upgrade)? Seems not, store int64 in the under layer
// MarshalJSON encodes the wrapped int64 into JSON.
func (i *Int64) MarshalJSON() ([]byte, error) {
return json.Marshal(i.Load())
}
// UnmarshalJSON decodes JSON into the wrapped int64.
func (i *Int64) UnmarshalJSON(b []byte) error {
var v int64
if err := json.Unmarshal(b, &v); err != nil {
return err
}
i.Store(v)
return nil
}
Co-authored-by: D3Hunter <[email protected]>
use util.WaitGroupWrapper rename limit related func use atomic.int64 in storeWriteLimiter remove noopStoreWriteLimit
@@ -546,6 +546,9 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { | |||
// decode special args for this job. | |||
func (job *Job) Decode(b []byte) error { | |||
err := json.Unmarshal(b, job) | |||
if job.MayNeedReorg() && job.ReorgMeta == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MayNeedReorg
depends on a context var set at DDL executor, when job worker run it, it's not there
maybe move it to where job.ReorgMeta
is used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update in a128426 PTAL again
rest lgtm |
speed int64 | ||
err error | ||
) | ||
v := opt.Value.(*expression.Constant) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add unit tests to make sure it will always be a constant. For example, ALTER ... max_write_speed = RAND() should fail with friendly error message
pkg/planner/core/planbuilder.go
Outdated
case types.ETInt: | ||
speed = v.Value.GetInt64() | ||
default: | ||
return 0, fmt.Errorf("the value %s for %s is invalid", opt.Name, opt.Value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a unit test to check the error message is readable in this case.
pkg/planner/core/planbuilder.go
Outdated
speedStr := v.Value.GetString() | ||
speed, err = units.RAMInBytes(speedStr) | ||
if err != nil { | ||
return 0, errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check error mesage is readable when failed to parse
gotTokens += n | ||
} | ||
elapsed := time.Since(start) | ||
maxTokens := 120 + int(float64(elapsed.Seconds())*float64(maxT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elapsed.Seconds() is already float64. Your IDE should notice this for you 🤔
elapsed := time.Since(start) | ||
maxTokens := 120 + int(float64(elapsed.Seconds())*float64(maxT)) | ||
// In theory, gotTokens should be less than or equal to maxT. | ||
// But we allow a little of error to avoid the test being flaky. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you find this test is flaky? I think the sleep-related functions guarantee that at least some duration it will block. So the gotTokens
should always be less or equal than maxT * elapsed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see it's because out limiter "Allow burst of at most 20% of the writeLimit." We can change hardcoded 120 to 1.2*maxT then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update in a1cc392
Co-authored-by: lance6716 <[email protected]>
Co-authored-by: lance6716 <[email protected]>
Co-authored-by: lance6716 <[email protected]>
pkg/ddl/db_test.go
Outdated
@@ -1195,6 +1195,17 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { | |||
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]") | |||
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]") | |||
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]") | |||
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2PiB for max_write_speed is out of range [0, 1125899906842624]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some error message cc @lance6716
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The negative tests cases are not enough. Please check at least max_write_speed = 1.23, max_write_speed = RAND(), max_write_speed = 30+40, max_write_speed = 'asdasd', and maybe ask other reviewers to come up with more cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some cases and fix comment in d71a826 PTAL again
@fzzf678: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: ref #57526 #57229
Problem Summary: See #57229
What changed and how does it work?
Supply later
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.