From c9fce8ad99887af091025cc0c4feb93bdd2c61ad Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Sat, 19 Oct 2024 12:03:12 +0530 Subject: [PATCH 1/8] test: add a test for premature buffering Signed-off-by: Manan Gupta --- .../reparent/newfeaturetest/reparent_test.go | 66 +++++++++++++++++++ go/test/endtoend/reparent/utils/utils.go | 45 +++++++++++++ 2 files changed, 111 insertions(+) diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index ad798d61792..4d23aabf1a0 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -19,7 +19,10 @@ package newfeaturetest import ( "context" "fmt" + "math/rand/v2" + "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -177,3 +180,66 @@ func TestERSWithWriteInPromoteReplica(t *testing.T) { _, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s") require.NoError(t, err, "ERS should not fail even if there is a sidecardb change") } + +func TestSimultaneousPRS(t *testing.T) { + defer cluster.PanicHandler(t) + clusterInstance := utils.SetupShardedReparentCluster(t, "semi_sync") + defer utils.TeardownCluster(clusterInstance) + + // Start by reparenting all the shards to the first tablet. + keyspace := clusterInstance.Keyspaces[0] + shards := keyspace.Shards + for _, shard := range shards { + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias) + require.NoError(t, err) + } + + rowCount := 1000 + vtParams := clusterInstance.GetVTParams(keyspace.Name) + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + // Now, we need to insert some data into the cluster. + for i := 1; i <= rowCount; i++ { + _, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false) + require.NoError(t, err) + } + + // Now we start a goroutine that continues to read the data until we've finished the test. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + go func() { + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + return + } + // We're running queries every 100 millisecond and verifying the results are all correct. + res, err := conn.ExecuteFetch(utils.GetSelectionQuery(), rowCount+10, false) + require.NoError(t, err) + require.Len(t, res.Rows, rowCount) + }() + } + } + }() + + // Now, we run go routines to run PRS calls on all the shards simultaneously. + wg := sync.WaitGroup{} + for _, shard := range shards { + wg.Add(1) + go func() { + time.Sleep(time.Second * time.Duration(rand.IntN(6))) + defer wg.Done() + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[1].Alias) + require.NoError(t, err) + }() + } + wg.Wait() + cancel() +} diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 0d3eddc0464..6b63f8487e3 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -74,6 +74,51 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync") } +// SetupShardedReparentCluster is used to setup a sharded cluster for testing +func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.LocalProcessCluster { + clusterInstance := cluster.NewCluster(cell1, Hostname) + // Start topo server + err := clusterInstance.StartTopo() + require.NoError(t, err) + + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--lock_tables_timeout", "5s", + "--track_schema_versions=true", + "--queryserver_enable_online_ddl=false") + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--enable_buffer", + // Long timeout in case failover is slow. + "--buffer_window", "10m", + "--buffer_max_failover_duration", "10m", + "--buffer_min_time_between_failovers", "20m", + ) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: sqlSchema, + VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`, + DurabilityPolicy: durability, + } + err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false) + require.NoError(t, err) + + // Start Vtgate + err = clusterInstance.StartVtgate() + require.NoError(t, err) + return clusterInstance +} + +// GetInsertQuery returns a built insert query to insert a row. +func GetInsertQuery(idx int) string { + return fmt.Sprintf(insertSQL, idx, idx) +} + +// GetSelectionQuery returns a built selection query read the data. +func GetSelectionQuery() string { + return `select * from vt_insert_test` +} + // TeardownCluster is used to teardown the reparent cluster. When // run in a CI environment -- which is considered true when the // "CI" env variable is set to "true" -- the teardown also removes From c597c44ddd9c9389a6b0d11e31c1414fd79f43c7 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 21 Oct 2024 16:24:31 +0530 Subject: [PATCH 2/8] test: update test to reproduce the buffering problem Signed-off-by: Manan Gupta --- .../reparent/newfeaturetest/reparent_test.go | 80 +++++++++---------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index 4d23aabf1a0..5f69c9b7556 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -19,10 +19,8 @@ package newfeaturetest import ( "context" "fmt" - "math/rand/v2" "sync" "testing" - "time" "github.com/stretchr/testify/require" @@ -181,11 +179,17 @@ func TestERSWithWriteInPromoteReplica(t *testing.T) { require.NoError(t, err, "ERS should not fail even if there is a sidecardb change") } -func TestSimultaneousPRS(t *testing.T) { +func TestBufferingWithMultipleDisruptions(t *testing.T) { defer cluster.PanicHandler(t) clusterInstance := utils.SetupShardedReparentCluster(t, "semi_sync") defer utils.TeardownCluster(clusterInstance) + // Stop all VTOrc instances, so that they don't interfere with the test. + for _, vtorc := range clusterInstance.VTOrcProcesses { + err := vtorc.TearDown() + require.NoError(t, err) + } + // Start by reparenting all the shards to the first tablet. keyspace := clusterInstance.Keyspaces[0] shards := keyspace.Shards @@ -194,52 +198,40 @@ func TestSimultaneousPRS(t *testing.T) { require.NoError(t, err) } - rowCount := 1000 - vtParams := clusterInstance.GetVTParams(keyspace.Name) - conn, err := mysql.Connect(context.Background(), &vtParams) - require.NoError(t, err) - // Now, we need to insert some data into the cluster. - for i := 1; i <= rowCount; i++ { - _, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false) - require.NoError(t, err) - } + // We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit + // to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't + // fix this. + //utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0]) + //utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0]) - // Now we start a goroutine that continues to read the data until we've finished the test. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - tick := time.NewTicker(100 * time.Millisecond) - defer tick.Stop() - for { - select { - case <-ctx.Done(): - return - case <-tick.C: - go func() { - conn, err := mysql.Connect(context.Background(), &vtParams) - if err != nil { - return - } - // We're running queries every 100 millisecond and verifying the results are all correct. - res, err := conn.ExecuteFetch(utils.GetSelectionQuery(), rowCount+10, false) - require.NoError(t, err) - require.Len(t, res.Rows, rowCount) - }() - } - } - }() - - // Now, we run go routines to run PRS calls on all the shards simultaneously. wg := sync.WaitGroup{} - for _, shard := range shards { + rowCount := 10 + vtParams := clusterInstance.GetVTParams(keyspace.Name) + // We now spawn writes for a bunch of go routines. + // The ones going to shard 1 and shard 2 should block, since + // they're in the midst of a reparenting operation (as seen by the buffering code). + for i := 1; i <= rowCount; i++ { wg.Add(1) - go func() { - time.Sleep(time.Second * time.Duration(rand.IntN(6))) + go func(i int) { defer wg.Done() - err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[1].Alias) + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + return + } + _, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false) require.NoError(t, err) - }() + }(i) } + + // Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2 + // since the disruption on the two shards hasn't stopped. + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias) + require.NoError(t, err) + // We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate. + //time.Sleep(1 * time.Second) + // Finally, we'll now simulate the 2 shards being healthy again by setting them back to read-write. + //utils.RunSQL(context.Background(), t, "set global read_only=0", shards[0].Vttablets[0]) + //utils.RunSQL(context.Background(), t, "set global read_only=0", shards[1].Vttablets[0]) + // Wait for all the writes to have succeeded. wg.Wait() - cancel() } From 9a50dfc13f0d0dd54b93d294748b1d3cb01a53f3 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 21 Oct 2024 21:46:40 +0530 Subject: [PATCH 3/8] test: uncomment lines that were accidentally commented Signed-off-by: Manan Gupta --- .../reparent/newfeaturetest/reparent_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index 5f69c9b7556..eed79160952 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -201,8 +202,8 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) { // We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit // to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't // fix this. - //utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0]) - //utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0]) + utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0]) + utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0]) wg := sync.WaitGroup{} rowCount := 10 @@ -218,6 +219,7 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) { if err != nil { return } + defer conn.Close() _, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false) require.NoError(t, err) }(i) @@ -228,10 +230,10 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) { err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias) require.NoError(t, err) // We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate. - //time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) // Finally, we'll now simulate the 2 shards being healthy again by setting them back to read-write. - //utils.RunSQL(context.Background(), t, "set global read_only=0", shards[0].Vttablets[0]) - //utils.RunSQL(context.Background(), t, "set global read_only=0", shards[1].Vttablets[0]) + utils.RunSQL(context.Background(), t, "set global read_only=0", shards[0].Vttablets[0]) + utils.RunSQL(context.Background(), t, "set global read_only=0", shards[1].Vttablets[0]) // Wait for all the writes to have succeeded. wg.Wait() } From 9aaab35c0ca6dd26e6ade09903c7c31b78cb6948 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 25 Oct 2024 15:38:54 +0530 Subject: [PATCH 4/8] feat: fix the problem by adding a new field in the shard state Signed-off-by: Manan Gupta --- .../reparent/newfeaturetest/reparent_test.go | 8 ++- go/test/endtoend/reparent/utils/utils.go | 2 + go/vt/discovery/keyspace_events.go | 67 ++++++++++++++++++- go/vt/vtgate/buffer/buffer.go | 16 ++++- go/vt/vtgate/buffer/buffer_helper_test.go | 2 +- go/vt/vtgate/buffer/buffer_test.go | 22 +++--- go/vt/vtgate/buffer/shard_buffer.go | 20 +++++- go/vt/vtgate/buffer/variables_test.go | 2 +- go/vt/vtgate/tabletgateway.go | 2 +- 9 files changed, 116 insertions(+), 25 deletions(-) diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index eed79160952..b6f34af7294 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -231,9 +231,11 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) { require.NoError(t, err) // We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate. time.Sleep(1 * time.Second) - // Finally, we'll now simulate the 2 shards being healthy again by setting them back to read-write. - utils.RunSQL(context.Background(), t, "set global read_only=0", shards[0].Vttablets[0]) - utils.RunSQL(context.Background(), t, "set global read_only=0", shards[1].Vttablets[0]) + // Finally, we'll now make the 2 shards healthy again by running PRS. + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias) + require.NoError(t, err) + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias) + require.NoError(t, err) // Wait for all the writes to have succeeded. wg.Wait() } diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 6b63f8487e3..2a51262557b 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -83,6 +83,8 @@ func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.Local clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--lock_tables_timeout", "5s", + // Fast health checks help find corner cases. + "--health_check_interval", "1s", "--track_schema_versions=true", "--queryserver_enable_online_ddl=false") clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 3de31ea6d9a..66953529dce 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -178,8 +178,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { } type shardState struct { - target *querypb.Target - serving bool + target *querypb.Target + serving bool + // waitForReparent is used to tell the keyspace event watcher + // that this shard should be marked serving only after a reparent + // operation has succeeded. + waitForReparent bool externallyReparented int64 currentPrimary *topodatapb.TabletAlias } @@ -368,8 +372,32 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { // if the shard went from serving to not serving, or the other way around, the keyspace // is undergoing an availability event if sstate.serving != th.Serving { - sstate.serving = th.Serving kss.consistent = false + switch { + case th.Serving && sstate.waitForReparent: + // While waiting for a reparent, if we receive a serving primary, + // we should check if the primary term start time is greater than the externally reparented time. + // We mark the shard serving only if it is. This is required so that we don't prematurely stop + // buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the + // same old primary tablet that has already been turned read-only. + if th.PrimaryTermStartTime > sstate.externallyReparented { + sstate.waitForReparent = false + sstate.serving = true + } + case th.Serving && !sstate.waitForReparent: + sstate.serving = true + case !th.Serving: + sstate.serving = false + // Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait + // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop + // buffering when we receive a serving healthcheck from the primary that is being demoted. + // However, if we receive a non-serving check, then we know that we won't receive any more serving + // healthchecks anymore until reparent finishes. Specifically, this helps us when PRS fails, but + // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote + // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop + // buffering for that case. + sstate.waitForReparent = false + } } // if the primary for this shard has been externally reparented, we're undergoing a failover, @@ -784,3 +812,36 @@ func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, } } } + +// MarkShardNotServing marks the given shard not serving. +// We use this when we start buffering for a given shard. This helps +// coordinate between the sharding logic and the keyspace event watcher. +// We take in a boolean as well to tell us whether this error is because +// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent. +// The return argument is whether the shard was found and marked not serving successfully or not. +func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool { + kss := kew.getKeyspaceStatus(ctx, keyspace) + if kss == nil { + // Only happens if the keyspace was deleted. + return false + } + kss.mu.Lock() + defer kss.mu.Unlock() + sstate := kss.shards[shard] + if sstate == nil { + // This only happens if the shard is deleted, or if + // the keyspace event watcher hasn't seen the shard at all. + return false + } + // Mark the keyspace inconsistent and the shard not serving. + kss.consistent = false + sstate.serving = false + if isReparentErr { + // If the error was triggered because a reparent operation has started. + // We mark the shard to wait for a reparent to finish before marking it serving. + // This is required to prevent premature stopping of buffering if we receive + // a serving healthcheck from a primary that is being demoted. + sstate.waitForReparent = true + } + return true +} diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 260fb272544..126a3a8826d 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -94,6 +94,18 @@ func CausedByFailover(err error) bool { return isFailover } +// IsErrorDueToReparenting is a stronger check than CausedByFailover, meant to return +// if the failure is caused because of a reparent. +func IsErrorDueToReparenting(err error) bool { + if vterrors.Code(err) != vtrpcpb.Code_CLUSTER_EVENT { + return false + } + if strings.Contains(err.Error(), ClusterEventReshardingInProgress) { + return false + } + return true +} + // for debugging purposes func getReason(err error) string { for _, ce := range ClusterEvents { @@ -175,7 +187,7 @@ func (b *Buffer) GetConfig() *Config { // It returns an error if buffering failed (e.g. buffer full). // If it does not return an error, it may return a RetryDoneFunc which must be // called after the request was retried. -func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // If an err is given, it must be related to a failover. // We never buffer requests with other errors. if err != nil && !CausedByFailover(err) { @@ -192,7 +204,7 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, requestsSkipped.Add([]string{keyspace, shard, skippedDisabled}, 1) return nil, nil } - return sb.waitForFailoverEnd(ctx, keyspace, shard, err) + return sb.waitForFailoverEnd(ctx, keyspace, shard, kev, err) } func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) { diff --git a/go/vt/vtgate/buffer/buffer_helper_test.go b/go/vt/vtgate/buffer/buffer_helper_test.go index 2deb460fc39..1276f0cd751 100644 --- a/go/vt/vtgate/buffer/buffer_helper_test.go +++ b/go/vt/vtgate/buffer/buffer_helper_test.go @@ -50,7 +50,7 @@ func issueRequestAndBlockRetry(ctx context.Context, t *testing.T, b *Buffer, err bufferingStopped := make(chan error) go func() { - retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, failoverErr) + retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, nil, failoverErr) if err != nil { bufferingStopped <- err } diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go index c730a8336d1..591647dc5d7 100644 --- a/go/vt/vtgate/buffer/buffer_test.go +++ b/go/vt/vtgate/buffer/buffer_test.go @@ -120,7 +120,7 @@ func testBuffering1WithOptions(t *testing.T, fail failover, concurrency int) { } // Subsequent requests with errors not related to the failover are not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -168,7 +168,7 @@ func testBuffering1WithOptions(t *testing.T, fail failover, concurrency int) { } // Second failover: Buffering is skipped because last failover is too recent. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("subsequent failovers must be skipped due to -buffer_min_time_between_failovers setting. err: %v retryDone: %v", err, retryDone) } if got, want := requestsSkipped.Counts()[statsKeyJoinedLastFailoverTooRecent], int64(1); got != want { @@ -226,7 +226,7 @@ func testDryRun1(t *testing.T, fail failover) { b := New(cfg) // Request does not get buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests must not be buffered during dry-run. err: %v retryDone: %v", err, retryDone) } // But the internal state changes though. @@ -272,10 +272,10 @@ func testPassthrough1(t *testing.T, fail failover) { b := New(cfg) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must never be buffered. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -311,7 +311,7 @@ func testLastReparentTooRecentBufferingSkipped1(t *testing.T, fail failover) { now = now.Add(1 * time.Second) fail(b, newPrimary, keyspace, shard, now) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests where the failover end was recently detected before the start must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -408,10 +408,10 @@ func testPassthroughDuringDrain1(t *testing.T, fail failover) { } // Requests during the drain will be passed through and not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests with failover errors must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } @@ -443,7 +443,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { b := New(cfg) ignoredKeyspace := "ignored_ks" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored keyspaces must not be buffered. err: %v retryDone: %v", err, retryDone) } statsKeyJoined := strings.Join([]string{ignoredKeyspace, shard, skippedDisabled}, ".") @@ -452,7 +452,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { } ignoredShard := "ff-" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored shards must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -634,7 +634,7 @@ func testEvictionNotPossible1(t *testing.T, fail failover) { // Newer requests of the second failover cannot evict anything because // they have no entries buffered. - retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, failoverErr) + retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, nil, failoverErr) if bufferErr == nil || retryDone != nil { t.Fatalf("buffer should have returned an error because it's full: err: %v retryDone: %v", bufferErr, retryDone) } diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index b0764c2ad91..df918865970 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -137,7 +137,7 @@ func (sb *shardBuffer) disabled() bool { return sb.mode == bufferModeDisabled } -func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // We assume if err != nil then it's always caused by a failover. // Other errors must be filtered at higher layers. failoverDetected := err != nil @@ -211,7 +211,11 @@ func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard s return nil, nil } - sb.startBufferingLocked(err) + // Try to start buffering. If we're unsuccessful, then we exit early. + if !sb.startBufferingLocked(ctx, kev, err) { + sb.mu.Unlock() + return nil, nil + } } if sb.mode == bufferModeDryRun { @@ -255,7 +259,16 @@ func (sb *shardBuffer) shouldBufferLocked(failoverDetected bool) bool { panic("BUG: All possible states must be covered by the switch expression above.") } -func (sb *shardBuffer) startBufferingLocked(err error) { +func (sb *shardBuffer) startBufferingLocked(ctx context.Context, kev *discovery.KeyspaceEventWatcher, err error) bool { + if kev != nil { + if !kev.MarkShardNotServing(ctx, sb.keyspace, sb.shard, IsErrorDueToReparenting(err)) { + // We failed to mark the shard as not serving. Do not buffer the request. + // This can happen if the keyspace has been deleted or if the keyspace even watcher + // hasn't yet seen the shard. Keyspace event watcher might not stop buffering for this + // request at all until it times out. It's better to not buffer this request. + return false + } + } // Reset monitoring data from previous failover. lastRequestsInFlightMax.Set(sb.statsKey, 0) lastRequestsDryRunMax.Set(sb.statsKey, 0) @@ -281,6 +294,7 @@ func (sb *shardBuffer) startBufferingLocked(err error) { sb.buf.config.MaxFailoverDuration, errorsanitizer.NormalizeError(err.Error()), ) + return true } // logErrorIfStateNotLocked logs an error if the current state is not "state". diff --git a/go/vt/vtgate/buffer/variables_test.go b/go/vt/vtgate/buffer/variables_test.go index a0640bde9e4..30d2426c639 100644 --- a/go/vt/vtgate/buffer/variables_test.go +++ b/go/vt/vtgate/buffer/variables_test.go @@ -51,7 +51,7 @@ func TestVariablesAreInitialized(t *testing.T) { // Create a new buffer and make a call which will create the shardBuffer object. // After that, the variables should be initialized for that shard. b := New(NewDefaultConfig()) - _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil /* err */) + _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil, nil) if err != nil { t.Fatalf("buffer should just passthrough and not return an error: %v", err) } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 7ed10d3813b..c36c6981fa2 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -310,7 +310,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // b) no transaction was created yet. if gw.buffer != nil && !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { // The next call blocks if we should buffer during a failover. - retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, err) + retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, gw.kev, err) // Request may have been buffered. if retryDone != nil { From 344cde37bab2dd46331c481f429bd9329e3457b9 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 25 Oct 2024 16:23:09 +0530 Subject: [PATCH 5/8] test: fix test to also send a higher primary timestamp Signed-off-by: Manan Gupta --- go/vt/discovery/fake_healthcheck.go | 15 +++++++++++++++ go/vt/vtgate/tabletgateway_flaky_test.go | 3 ++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index d1bde350276..6ae7ee105c2 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -172,6 +172,21 @@ func (fhc *FakeHealthCheck) SetTabletType(tablet *topodatapb.Tablet, tabletType item.ts.Target.TabletType = tabletType } +// SetPrimaryTimestamp sets the primary timestamp for the given tablet +func (fhc *FakeHealthCheck) SetPrimaryTimestamp(tablet *topodatapb.Tablet, timestamp int64) { + if fhc.ch == nil { + return + } + fhc.mu.Lock() + defer fhc.mu.Unlock() + key := TabletToMapKey(tablet) + item, isPresent := fhc.items[key] + if !isPresent { + return + } + item.ts.PrimaryTermStartTime = timestamp +} + // Unsubscribe is not implemented. func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) { } diff --git a/go/vt/vtgate/tabletgateway_flaky_test.go b/go/vt/vtgate/tabletgateway_flaky_test.go index e74f613b682..d136542d176 100644 --- a/go/vt/vtgate/tabletgateway_flaky_test.go +++ b/go/vt/vtgate/tabletgateway_flaky_test.go @@ -234,6 +234,7 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) { hc.SetTabletType(primaryTablet, topodatapb.TabletType_REPLICA) hc.Broadcast(primaryTablet) hc.SetTabletType(replicaTablet, topodatapb.TabletType_PRIMARY) + hc.SetPrimaryTimestamp(replicaTablet, 100) // We set a higher timestamp than before to simulate a PRS. hc.SetServing(replicaTablet, true) hc.Broadcast(replicaTablet) @@ -245,7 +246,7 @@ outer: require.Fail(t, "timed out - could not verify the new primary") case <-time.After(10 * time.Millisecond): newPrimary, shouldBuffer := tg.kev.ShouldStartBufferingForTarget(ctx, target) - if newPrimary != nil && newPrimary.Uid == 1 && !shouldBuffer { + if newPrimary != nil && newPrimary.Uid == replicaTablet.Alias.Uid && !shouldBuffer { break outer } } From 93effed46e0faf1de68ffcae3dd8291d11650525 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 25 Oct 2024 16:54:12 +0530 Subject: [PATCH 6/8] feat: add tests for processing of health checks and also fix a bug Signed-off-by: Manan Gupta --- go/vt/discovery/keyspace_events.go | 20 ++- go/vt/discovery/keyspace_events_test.go | 228 ++++++++++++++++++++++++ 2 files changed, 239 insertions(+), 9 deletions(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 66953529dce..20b4b4982f5 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -388,17 +388,19 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { sstate.serving = true case !th.Serving: sstate.serving = false - // Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait - // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop - // buffering when we receive a serving healthcheck from the primary that is being demoted. - // However, if we receive a non-serving check, then we know that we won't receive any more serving - // healthchecks anymore until reparent finishes. Specifically, this helps us when PRS fails, but - // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote - // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop - // buffering for that case. - sstate.waitForReparent = false } } + if !th.Serving { + // Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait + // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop + // buffering when we receive a serving healthcheck from the primary that is being demoted. + // However, if we receive a non-serving check, then we know that we won't receive any more serving + // healthchecks anymore until reparent finishes. Specifically, this helps us when PRS fails, but + // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote + // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop + // buffering for that case. + sstate.waitForReparent = false + } // if the primary for this shard has been externally reparented, we're undergoing a failover, // which is considered an availability event. update this shard to point it to the new tablet diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index df5696841f7..21e0167e5cf 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -414,6 +414,234 @@ func TestWaitForConsistentKeyspaces(t *testing.T) { } } +func TestOnHealthCheck(t *testing.T) { + testcases := []struct { + name string + ss *shardState + th *TabletHealth + wantServing bool + wantWaitForReparent bool + wantExternallyReparented int64 + wantUID uint32 + }{ + { + name: "Non primary tablet health ignored", + ss: &shardState{ + serving: false, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_REPLICA, + }, + Serving: true, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "Serving primary seen in non-serving shard", + ss: &shardState{ + serving: false, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "New serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "Old serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: true, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "Old non-serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: false, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "New serving primary while already serving", + ss: &shardState{ + serving: true, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "Primary goes non serving", + ss: &shardState{ + serving: true, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: false, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, + } + + ksName := "ks" + shard := "-80" + kss := &keyspaceState{ + mu: sync.Mutex{}, + keyspace: ksName, + shards: make(map[string]*shardState), + } + // Adding this so that we don't run any topo calls from ensureConsistentLocked. + kss.moveTablesState = &MoveTablesState{ + Typ: MoveTablesRegular, + State: MoveTablesSwitching, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + kss.shards[shard] = tt.ss + tt.th.Target.Keyspace = ksName + tt.th.Target.Shard = shard + kss.onHealthCheck(tt.th) + require.Equal(t, tt.wantServing, tt.ss.serving) + require.Equal(t, tt.wantWaitForReparent, tt.ss.waitForReparent) + require.Equal(t, tt.wantExternallyReparented, tt.ss.externallyReparented) + require.Equal(t, tt.wantUID, tt.ss.currentPrimary.Uid) + }) + } +} + type fakeTopoServer struct { } From 41dc00e286d54b4f7b9e9c4786853c776a084412 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 25 Oct 2024 17:00:53 +0530 Subject: [PATCH 7/8] test: add more unit tests Signed-off-by: Manan Gupta --- go/vt/vtgate/buffer/buffer_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go index 591647dc5d7..571b01d61a8 100644 --- a/go/vt/vtgate/buffer/buffer_test.go +++ b/go/vt/vtgate/buffer/buffer_test.go @@ -72,6 +72,32 @@ var ( } ) +func TestIsErrorDueToReparenting(t *testing.T) { + testcases := []struct { + err error + want bool + }{ + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, ClusterEventReshardingInProgress), + want: false, + }, + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, ClusterEventReparentInProgress), + want: true, + }, + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "The MySQL server is running with the --super-read-only option"), + want: true, + }, + } + for _, tt := range testcases { + t.Run(tt.err.Error(), func(t *testing.T) { + got := IsErrorDueToReparenting(tt.err) + assert.Equal(t, tt.want, got) + }) + } +} + func TestBuffering(t *testing.T) { testAllImplementations(t, func(t *testing.T, fail failover) { testBuffering1WithOptions(t, fail, 1) From 368d449f8d94af0fee14055b7f0eab236d7228f7 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 11 Nov 2024 11:37:14 +0530 Subject: [PATCH 8/8] feat: address review comments Signed-off-by: Manan Gupta --- go/vt/discovery/keyspace_events.go | 2 +- go/vt/vtgate/buffer/buffer.go | 4 ++-- go/vt/vtgate/buffer/buffer_test.go | 2 +- go/vt/vtgate/buffer/shard_buffer.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 20b4b4982f5..1cb44184778 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -395,7 +395,7 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop // buffering when we receive a serving healthcheck from the primary that is being demoted. // However, if we receive a non-serving check, then we know that we won't receive any more serving - // healthchecks anymore until reparent finishes. Specifically, this helps us when PRS fails, but + // health checks until reparent finishes. Specifically, this helps us when PRS fails, but // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop // buffering for that case. diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 126a3a8826d..dec83e2c78c 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -94,9 +94,9 @@ func CausedByFailover(err error) bool { return isFailover } -// IsErrorDueToReparenting is a stronger check than CausedByFailover, meant to return +// isErrorDueToReparenting is a stronger check than CausedByFailover, meant to return // if the failure is caused because of a reparent. -func IsErrorDueToReparenting(err error) bool { +func isErrorDueToReparenting(err error) bool { if vterrors.Code(err) != vtrpcpb.Code_CLUSTER_EVENT { return false } diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go index 571b01d61a8..fc326ce0ce5 100644 --- a/go/vt/vtgate/buffer/buffer_test.go +++ b/go/vt/vtgate/buffer/buffer_test.go @@ -92,7 +92,7 @@ func TestIsErrorDueToReparenting(t *testing.T) { } for _, tt := range testcases { t.Run(tt.err.Error(), func(t *testing.T) { - got := IsErrorDueToReparenting(tt.err) + got := isErrorDueToReparenting(tt.err) assert.Equal(t, tt.want, got) }) } diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index df918865970..e1f02bb7f0e 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -261,7 +261,7 @@ func (sb *shardBuffer) shouldBufferLocked(failoverDetected bool) bool { func (sb *shardBuffer) startBufferingLocked(ctx context.Context, kev *discovery.KeyspaceEventWatcher, err error) bool { if kev != nil { - if !kev.MarkShardNotServing(ctx, sb.keyspace, sb.shard, IsErrorDueToReparenting(err)) { + if !kev.MarkShardNotServing(ctx, sb.keyspace, sb.shard, isErrorDueToReparenting(err)) { // We failed to mark the shard as not serving. Do not buffer the request. // This can happen if the keyspace has been deleted or if the keyspace even watcher // hasn't yet seen the shard. Keyspace event watcher might not stop buffering for this