-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Heartbeat writer can always generate on-demand leased heartbeats, even if not at all configured #16014
Heartbeat writer can always generate on-demand leased heartbeats, even if not at all configured #16014
Changes from 1 commit
79a37f4
f89eeb0
2d29eda
1116f80
02ca773
ad8950a
58be5f2
1fd2b41
0ba8682
236f074
63ab624
a8673c1
2d0fb4a
8d39664
875a2d6
73046cf
81b9d53
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…ectively. We use golang atomic LoadPointer/StorePointer to avoid using mutexes in RequestHeartbeats Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,6 @@ func TestWriteHeartbeat(t *testing.T) { | |
|
||
now := time.Now() | ||
tw := newSimpleTestWriter(db, &now) | ||
defer tw.stop() | ||
|
||
upsert := fmt.Sprintf("INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%d, %d, '%s') ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)", | ||
"_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard) | ||
|
@@ -61,7 +60,6 @@ func TestWriteHeartbeatOpen(t *testing.T) { | |
defer db.Close() | ||
|
||
tw := newSimpleTestWriter(db, nil) | ||
defer tw.stop() | ||
|
||
assert.Zero(t, tw.onDemandDuration) | ||
|
||
|
@@ -79,8 +77,17 @@ func TestWriteHeartbeatOpen(t *testing.T) { | |
<-time.After(3 * time.Second) | ||
assert.EqualValues(t, 1, writes.Get()) | ||
}) | ||
|
||
{ | ||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.Nil(t, rateLimiter) | ||
} | ||
tw.Open() | ||
defer tw.Close() | ||
{ | ||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.Nil(t, rateLimiter) | ||
} | ||
t.Run("open, heartbeats", func(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) | ||
defer cancel() | ||
|
@@ -109,7 +116,6 @@ func TestWriteHeartbeatDisabled(t *testing.T) { | |
defer db.Close() | ||
|
||
tw := newTestWriter(db, nil, tabletenv.Disable, 0) | ||
defer tw.stop() | ||
|
||
// Even though disabled, the writer will have an on-demand duration set. | ||
assert.Equal(t, defaultOnDemandDuration, tw.onDemandDuration) | ||
|
@@ -128,8 +134,16 @@ func TestWriteHeartbeatDisabled(t *testing.T) { | |
<-time.After(3 * time.Second) | ||
assert.EqualValues(t, 1, writes.Get()) | ||
}) | ||
{ | ||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.Nil(t, rateLimiter) | ||
} | ||
tw.Open() | ||
defer tw.Close() | ||
{ | ||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.NotNil(t, rateLimiter) | ||
} | ||
t.Run("open, no heartbeats", func(t *testing.T) { | ||
<-time.After(3 * time.Second) | ||
assert.EqualValues(t, 1, writes.Get()) | ||
|
@@ -158,6 +172,11 @@ func TestWriteHeartbeatDisabled(t *testing.T) { | |
<-time.After(3 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO we should use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's actually explicitly using a different value. It would be confusing to reuse If we reused Does that make sense? |
||
assert.EqualValues(t, currentWrites, writes.Get()) | ||
}) | ||
tw.Close() | ||
{ | ||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.Nil(t, rateLimiter) | ||
} | ||
} | ||
|
||
// TestWriteHeartbeatOnDemand tests that the heartbeat writer initiates leased heartbeats once opened, | ||
|
@@ -170,7 +189,6 @@ func TestWriteHeartbeatOnDemand(t *testing.T) { | |
defer db.Close() | ||
|
||
tw := newTestWriter(db, nil, tabletenv.Heartbeat, onDemandDuration) | ||
defer tw.stop() | ||
|
||
assert.Equal(t, onDemandDuration, tw.onDemandDuration) | ||
|
||
|
@@ -188,8 +206,16 @@ func TestWriteHeartbeatOnDemand(t *testing.T) { | |
<-time.After(3 * time.Second) | ||
assert.EqualValues(t, 1, writes.Get()) | ||
}) | ||
{ | ||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.Nil(t, rateLimiter) | ||
} | ||
tw.Open() | ||
defer tw.Close() | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personal preference, but I don't see the value that these code blocks add and use of them is pretty non-standard within Vitess. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll explain why I use this: it's allowing me to redeclare a variable (
It's a personal preference and I recognize this is not standard in Vitess. FWIW I've already spread that around quite a bit. |
||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.NotNil(t, rateLimiter) | ||
} | ||
t.Run("open, initial heartbeats", func(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.Background(), tw.onDemandDuration-time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The reasoning is that I don't want to get past the expiration of the heartbeat lease. This function is checking:
If we go past the expiration, then we can wrongly condlude that heartbeats are not being produced when they should (they shouldn't, past the expiration). |
||
defer cancel() | ||
|
@@ -225,6 +251,11 @@ func TestWriteHeartbeatOnDemand(t *testing.T) { | |
<-time.After(3 * time.Second) | ||
assert.EqualValues(t, currentWrites, writes.Get()) | ||
}) | ||
tw.Close() | ||
{ | ||
rateLimiter := tw.safeOnDemandRequestsRateLimiter() | ||
assert.Nil(t, rateLimiter) | ||
} | ||
} | ||
|
||
func TestWriteHeartbeatError(t *testing.T) { | ||
|
@@ -233,7 +264,6 @@ func TestWriteHeartbeatError(t *testing.T) { | |
|
||
now := time.Now() | ||
tw := newSimpleTestWriter(db, &now) | ||
defer tw.stop() | ||
|
||
writes.Reset() | ||
writeErrors.Reset() | ||
|
@@ -247,7 +277,6 @@ func TestWriteHeartbeatError(t *testing.T) { | |
func TestCloseWhileStuckWriting(t *testing.T) { | ||
db := fakesqldb.New(t) | ||
tw := newSimpleTestWriter(db, nil) | ||
defer tw.stop() | ||
|
||
tw.isOpen = true | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO using the atomic.Pointer value here (assuming there aren't good reasons to switch to that type) would be clearer and safer too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!