Skip to content

Commit

Permalink
feat: fix the problem by adding a new field in the shard state
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Oct 25, 2024
1 parent 3cbe293 commit 9aaab35
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 25 deletions.
8 changes: 5 additions & 3 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) {
require.NoError(t, err)
// We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate.
time.Sleep(1 * time.Second)
// Finally, we'll now simulate the 2 shards being healthy again by setting them back to read-write.
utils.RunSQL(context.Background(), t, "set global read_only=0", shards[0].Vttablets[0])
utils.RunSQL(context.Background(), t, "set global read_only=0", shards[1].Vttablets[0])
// Finally, we'll now make the 2 shards healthy again by running PRS.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias)
require.NoError(t, err)
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias)
require.NoError(t, err)
// Wait for all the writes to have succeeded.
wg.Wait()
}
2 changes: 2 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.Local

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--lock_tables_timeout", "5s",
// Fast health checks help find corner cases.
"--health_check_interval", "1s",
"--track_schema_versions=true",
"--queryserver_enable_online_ddl=false")
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
Expand Down
67 changes: 64 additions & 3 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
}

type shardState struct {
target *querypb.Target
serving bool
target *querypb.Target
serving bool
// waitForReparent is used to tell the keyspace event watcher
// that this shard should be marked serving only after a reparent
// operation has succeeded.
waitForReparent bool
externallyReparented int64
currentPrimary *topodatapb.TabletAlias
}
Expand Down Expand Up @@ -368,8 +372,32 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) {
// if the shard went from serving to not serving, or the other way around, the keyspace
// is undergoing an availability event
if sstate.serving != th.Serving {
sstate.serving = th.Serving
kss.consistent = false
switch {
case th.Serving && sstate.waitForReparent:
// While waiting for a reparent, if we receive a serving primary,
// we should check if the primary term start time is greater than the externally reparented time.
// We mark the shard serving only if it is. This is required so that we don't prematurely stop
// buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the
// same old primary tablet that has already been turned read-only.
if th.PrimaryTermStartTime > sstate.externallyReparented {
sstate.waitForReparent = false
sstate.serving = true
}
case th.Serving && !sstate.waitForReparent:
sstate.serving = true
case !th.Serving:
sstate.serving = false
// Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait
// for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop
// buffering when we receive a serving healthcheck from the primary that is being demoted.
// However, if we receive a non-serving check, then we know that we won't receive any more serving
// healthchecks anymore until reparent finishes. Specifically, this helps us when PRS fails, but
// stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote
// the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop
// buffering for that case.
sstate.waitForReparent = false
}
}

// if the primary for this shard has been externally reparented, we're undergoing a failover,
Expand Down Expand Up @@ -784,3 +812,36 @@ func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context,
}
}
}

// MarkShardNotServing marks the given shard not serving.
// We use this when we start buffering for a given shard. This helps
// coordinate between the sharding logic and the keyspace event watcher.
// We take in a boolean as well to tell us whether this error is because
// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent.
// The return argument is whether the shard was found and marked not serving successfully or not.
func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool {
kss := kew.getKeyspaceStatus(ctx, keyspace)
if kss == nil {
// Only happens if the keyspace was deleted.
return false
}
kss.mu.Lock()
defer kss.mu.Unlock()
sstate := kss.shards[shard]
if sstate == nil {
// This only happens if the shard is deleted, or if
// the keyspace event watcher hasn't seen the shard at all.
return false
}
// Mark the keyspace inconsistent and the shard not serving.
kss.consistent = false
sstate.serving = false
if isReparentErr {
// If the error was triggered because a reparent operation has started.
// We mark the shard to wait for a reparent to finish before marking it serving.
// This is required to prevent premature stopping of buffering if we receive
// a serving healthcheck from a primary that is being demoted.
sstate.waitForReparent = true
}
return true
}
16 changes: 14 additions & 2 deletions go/vt/vtgate/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ func CausedByFailover(err error) bool {
return isFailover
}

// IsErrorDueToReparenting is a stronger check than CausedByFailover, meant to return
// if the failure is caused because of a reparent.
func IsErrorDueToReparenting(err error) bool {
if vterrors.Code(err) != vtrpcpb.Code_CLUSTER_EVENT {
return false
}
if strings.Contains(err.Error(), ClusterEventReshardingInProgress) {
return false
}
return true
}

// for debugging purposes
func getReason(err error) string {
for _, ce := range ClusterEvents {
Expand Down Expand Up @@ -175,7 +187,7 @@ func (b *Buffer) GetConfig() *Config {
// It returns an error if buffering failed (e.g. buffer full).
// If it does not return an error, it may return a RetryDoneFunc which must be
// called after the request was retried.
func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) {
func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) {
// If an err is given, it must be related to a failover.
// We never buffer requests with other errors.
if err != nil && !CausedByFailover(err) {
Expand All @@ -192,7 +204,7 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string,
requestsSkipped.Add([]string{keyspace, shard, skippedDisabled}, 1)
return nil, nil
}
return sb.waitForFailoverEnd(ctx, keyspace, shard, err)
return sb.waitForFailoverEnd(ctx, keyspace, shard, kev, err)
}

func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/buffer/buffer_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func issueRequestAndBlockRetry(ctx context.Context, t *testing.T, b *Buffer, err
bufferingStopped := make(chan error)

go func() {
retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, failoverErr)
retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, nil, failoverErr)
if err != nil {
bufferingStopped <- err
}
Expand Down
22 changes: 11 additions & 11 deletions go/vt/vtgate/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func testBuffering1WithOptions(t *testing.T, fail failover, concurrency int) {
}

// Subsequent requests with errors not related to the failover are not buffered.
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil {
t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone)
}

Expand Down Expand Up @@ -168,7 +168,7 @@ func testBuffering1WithOptions(t *testing.T, fail failover, concurrency int) {
}

// Second failover: Buffering is skipped because last failover is too recent.
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil {
t.Fatalf("subsequent failovers must be skipped due to -buffer_min_time_between_failovers setting. err: %v retryDone: %v", err, retryDone)
}
if got, want := requestsSkipped.Counts()[statsKeyJoinedLastFailoverTooRecent], int64(1); got != want {
Expand Down Expand Up @@ -226,7 +226,7 @@ func testDryRun1(t *testing.T, fail failover) {
b := New(cfg)

// Request does not get buffered.
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil {
t.Fatalf("requests must not be buffered during dry-run. err: %v retryDone: %v", err, retryDone)
}
// But the internal state changes though.
Expand Down Expand Up @@ -272,10 +272,10 @@ func testPassthrough1(t *testing.T, fail failover) {

b := New(cfg)

if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil {
t.Fatalf("requests with no error must never be buffered. err: %v retryDone: %v", err, retryDone)
}
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil {
t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone)
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func testLastReparentTooRecentBufferingSkipped1(t *testing.T, fail failover) {
now = now.Add(1 * time.Second)
fail(b, newPrimary, keyspace, shard, now)

if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil {
t.Fatalf("requests where the failover end was recently detected before the start must not be buffered. err: %v retryDone: %v", err, retryDone)
}
if err := waitForPoolSlots(b, cfg.Size); err != nil {
Expand Down Expand Up @@ -408,10 +408,10 @@ func testPassthroughDuringDrain1(t *testing.T, fail failover) {
}

// Requests during the drain will be passed through and not buffered.
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil {
t.Fatalf("requests with no error must not be buffered during a drain. err: %v retryDone: %v", err, retryDone)
}
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil {
t.Fatalf("requests with failover errors must not be buffered during a drain. err: %v retryDone: %v", err, retryDone)
}

Expand Down Expand Up @@ -443,7 +443,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) {
b := New(cfg)

ignoredKeyspace := "ignored_ks"
if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, failoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, nil, failoverErr); err != nil || retryDone != nil {
t.Fatalf("requests for ignored keyspaces must not be buffered. err: %v retryDone: %v", err, retryDone)
}
statsKeyJoined := strings.Join([]string{ignoredKeyspace, shard, skippedDisabled}, ".")
Expand All @@ -452,7 +452,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) {
}

ignoredShard := "ff-"
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, failoverErr); err != nil || retryDone != nil {
if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, nil, failoverErr); err != nil || retryDone != nil {
t.Fatalf("requests for ignored shards must not be buffered. err: %v retryDone: %v", err, retryDone)
}
if err := waitForPoolSlots(b, cfg.Size); err != nil {
Expand Down Expand Up @@ -634,7 +634,7 @@ func testEvictionNotPossible1(t *testing.T, fail failover) {

// Newer requests of the second failover cannot evict anything because
// they have no entries buffered.
retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, failoverErr)
retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, nil, failoverErr)
if bufferErr == nil || retryDone != nil {
t.Fatalf("buffer should have returned an error because it's full: err: %v retryDone: %v", bufferErr, retryDone)
}
Expand Down
20 changes: 17 additions & 3 deletions go/vt/vtgate/buffer/shard_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (sb *shardBuffer) disabled() bool {
return sb.mode == bufferModeDisabled
}

func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) {
func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) {
// We assume if err != nil then it's always caused by a failover.
// Other errors must be filtered at higher layers.
failoverDetected := err != nil
Expand Down Expand Up @@ -211,7 +211,11 @@ func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard s
return nil, nil
}

sb.startBufferingLocked(err)
// Try to start buffering. If we're unsuccessful, then we exit early.
if !sb.startBufferingLocked(ctx, kev, err) {
sb.mu.Unlock()
return nil, nil
}
}

if sb.mode == bufferModeDryRun {
Expand Down Expand Up @@ -255,7 +259,16 @@ func (sb *shardBuffer) shouldBufferLocked(failoverDetected bool) bool {
panic("BUG: All possible states must be covered by the switch expression above.")
}

func (sb *shardBuffer) startBufferingLocked(err error) {
func (sb *shardBuffer) startBufferingLocked(ctx context.Context, kev *discovery.KeyspaceEventWatcher, err error) bool {
if kev != nil {
if !kev.MarkShardNotServing(ctx, sb.keyspace, sb.shard, IsErrorDueToReparenting(err)) {
// We failed to mark the shard as not serving. Do not buffer the request.
// This can happen if the keyspace has been deleted or if the keyspace even watcher
// hasn't yet seen the shard. Keyspace event watcher might not stop buffering for this
// request at all until it times out. It's better to not buffer this request.
return false
}
}
// Reset monitoring data from previous failover.
lastRequestsInFlightMax.Set(sb.statsKey, 0)
lastRequestsDryRunMax.Set(sb.statsKey, 0)
Expand All @@ -281,6 +294,7 @@ func (sb *shardBuffer) startBufferingLocked(err error) {
sb.buf.config.MaxFailoverDuration,
errorsanitizer.NormalizeError(err.Error()),
)
return true
}

// logErrorIfStateNotLocked logs an error if the current state is not "state".
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/buffer/variables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestVariablesAreInitialized(t *testing.T) {
// Create a new buffer and make a call which will create the shardBuffer object.
// After that, the variables should be initialized for that shard.
b := New(NewDefaultConfig())
_, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil /* err */)
_, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil, nil)
if err != nil {
t.Fatalf("buffer should just passthrough and not return an error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
// b) no transaction was created yet.
if gw.buffer != nil && !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY {
// The next call blocks if we should buffer during a failover.
retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, err)
retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, gw.kev, err)

// Request may have been buffered.
if retryDone != nil {
Expand Down

0 comments on commit 9aaab35

Please sign in to comment.