Skip to content

Commit

Permalink
refactor: rename the function and add more comments to make the code …
Browse files Browse the repository at this point in the history
…clearer

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 28, 2024
1 parent 9e04eda commit 34da819
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 38 deletions.
42 changes: 31 additions & 11 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,32 +669,52 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar
return ks.beingResharded(target.Shard)
}

// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is
// that the primary tablet for that shard is not serving. This is possible during a Planned
// Reparent Shard operation. Just as the operation completes, a new primary will be elected, and
// ShouldStartBufferingForTarget checks if we should be starting buffering for the given target.
// We check the following things before we start buffering -
// 1. The shard must have a primary.
// 2. The primary must be non-serving.
// 3. The keyspace must be marked inconsistent.
//
// This buffering is meant to kick in during a Planned Reparent Shard operation.
// As part of that operation the old primary will become non-serving. At that point
// this code should return true to start buffering requests.
// Just as the PRS operation completes, a new primary will be elected, and
// it will send its own healthcheck stating that it is serving. We should buffer requests until
// that point. There are use cases where people do not run with a Primary server at all, so we must
// that point.
//
// There are use cases where people do not run with a Primary server at all, so we must
// verify that we only start buffering when a primary was present, and it went not serving.
// The shard state keeps track of the current primary and the last externally reparented time, which
// we can use to determine that there was a serving primary which now became non serving. This is
// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will
// stop when these operations succeed. We return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
// stop when these operations succeed. We also return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
if target.TabletType != topodatapb.TabletType_PRIMARY {
// We don't support buffering for any target tablet type other than the primary.
return nil, false
}
ks := kew.getKeyspaceStatus(ctx, target.Keyspace)
if ks == nil {
// If the keyspace status is nil, then the keyspace must be deleted.
// The user query is trying to access a keyspace that has been deleted.
// There is no reason to buffer this query.
return nil, false
}
ks.mu.Lock()
defer ks.mu.Unlock()
if state, ok := ks.shards[target.Shard]; ok {
// The first time we receive an update for a serving primary, we set the externallyReparented value and currentPrimary values.
// These never get reset, so the last two checks checking for them being non-empty is purely for defensive reasons, so that we don't
// return that the primary is not serving when there is no primary that the keyspace event watcher has seen yet.
// The reason this function returns if the Primary is not serving and not just if it is serving, because we want to very defensive in when we say
// the primary is not serving. This function is used to start buffering and we don't want to start buffering when we don't know for sure if the primary
// As described in the function comment, we only want to start buffering when all the following conditions are met -
// 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty.
// They are set the first time the shard registers an update from a serving primary and are never cleared out after that.
// If the user has configred vtgates to wait for the primary tablet healthchecks before starting query service, this condition
// will always be true.
// 2. The primary must be non-serving. We check this by checking the serving field in the shard state.
// When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added
// for being defensive against any bugs.
// 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state.
//
// The reason we need all the three checks is that we want to be very defensive in when we start buffering.
// We don't want to start buffering when we don't know for sure if the primary
// is not serving and we will receive an update that stops buffering soon.
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
}
Expand Down
38 changes: 19 additions & 19 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ func TestKeyspaceEventTypes(t *testing.T) {
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)

type testCase struct {
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectPrimaryNotServing bool
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectShouldBuffer bool
}

testCases := []testCase{
Expand Down Expand Up @@ -189,9 +189,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "two to four resharding in progress",
Expand Down Expand Up @@ -250,9 +250,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-80",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "unsharded primary not serving",
Expand All @@ -276,9 +276,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-",
expectResharding: false,
expectShouldBuffer: true,
},
{
name: "sharded primary not serving",
Expand Down Expand Up @@ -310,9 +310,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-80",
expectResharding: false,
expectShouldBuffer: true,
},
}

Expand All @@ -327,8 +327,8 @@ func TestKeyspaceEventTypes(t *testing.T) {
resharding := kew.TargetIsBeingResharded(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding)

_, primaryDown := kew.PrimaryIsNotServing(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing)
_, shouldBuffer := kew.ShouldStartBufferingForTarget(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, shouldBuffer, tc.expectShouldBuffer, "ShouldStartBufferingForTarget should return %t", tc.expectShouldBuffer)
})
}
}
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,13 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress)
continue
}
primary, notServing := kev.PrimaryIsNotServing(ctx, target)
if notServing {
primary, shouldBuffer := kev.ShouldStartBufferingForTarget(ctx, target)
if shouldBuffer {
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReparentInProgress)
continue
}
// if primary is serving, but we initially found no tablet, we're in an inconsistent state
// if the keyspace event manager doesn't think we should buffer queries, and also sees a primary tablet,
// but we initially found no tablet, we're in an inconsistent state
// we then retry the entire loop
if primary != nil {
err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "inconsistent state detected, primary is serving but initially found no available tablet")
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/tabletgateway_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
waitForBuffering := func(enabled bool) {
timer := time.NewTimer(bufferingWaitTimeout)
defer timer.Stop()
for _, buffering := tg.kev.PrimaryIsNotServing(ctx, target); buffering != enabled; _, buffering = tg.kev.PrimaryIsNotServing(ctx, target) {
for _, buffering := tg.kev.ShouldStartBufferingForTarget(ctx, target); buffering != enabled; _, buffering = tg.kev.ShouldStartBufferingForTarget(ctx, target) {
select {
case <-timer.C:
require.Fail(t, "timed out waiting for buffering of enabled: %t", enabled)
Expand Down Expand Up @@ -213,8 +213,8 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) {
hc.Broadcast(primaryTablet)

require.Len(t, tg.hc.GetHealthyTabletStats(target), 0, "GetHealthyTabletStats has tablets even though it shouldn't")
_, isNotServing := tg.kev.PrimaryIsNotServing(ctx, target)
require.True(t, isNotServing)
_, shouldStartBuffering := tg.kev.ShouldStartBufferingForTarget(ctx, target)
require.True(t, shouldStartBuffering)

// add a result to the sandbox connection of the new primary
sbcReplica.SetResults([]*sqltypes.Result{sqlResult1})
Expand Down Expand Up @@ -244,8 +244,8 @@ outer:
case <-timeout:
require.Fail(t, "timed out - could not verify the new primary")
case <-time.After(10 * time.Millisecond):
newPrimary, notServing := tg.kev.PrimaryIsNotServing(ctx, target)
if newPrimary != nil && newPrimary.Uid == 1 && !notServing {
newPrimary, shouldBuffer := tg.kev.ShouldStartBufferingForTarget(ctx, target)
if newPrimary != nil && newPrimary.Uid == 1 && !shouldBuffer {
break outer
}
}
Expand Down

0 comments on commit 34da819

Please sign in to comment.