Skip to content

Commit

Permalink
Flakes: De-flake TestGatewayBufferingWhenPrimarySwitchesServingState (v…
Browse files Browse the repository at this point in the history
…itessio#14968)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored and arthurschreiber committed Nov 7, 2024
1 parent f30abe0 commit d18e303
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions go/vt/vtgate/tabletgateway_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/utils"

"vitess.io/vitess/go/mysql/collations"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/vtgate/buffer"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/buffer"
)

// TestGatewayBufferingWhenPrimarySwitchesServingState is used to test that the buffering mechanism buffers the queries when a primary goes to a non serving state and
Expand Down Expand Up @@ -64,6 +63,20 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
// add a primary tabelt which is serving
sbc := hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil)

bufferingWaitTimeout := 60 * time.Second
waitForBuffering := func(enabled bool) {
timer := time.NewTimer(bufferingWaitTimeout)
defer timer.Stop()
for _, buffering := tg.kev.PrimaryIsNotServing(ctx, target); buffering != enabled; _, buffering = tg.kev.PrimaryIsNotServing(ctx, target) {
select {
case <-timer.C:
require.Fail(t, "timed out waiting for buffering of enabled: %t", enabled)
default:
}
time.Sleep(10 * time.Millisecond)
}
}

// add a result to the sandbox connection
sqlResult1 := &sqltypes.Result{
Fields: []*querypb.Field{{
Expand Down Expand Up @@ -94,6 +107,8 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
// add another result to the sandbox connection
sbc.SetResults([]*sqltypes.Result{sqlResult1})

waitForBuffering(true)

// execute the query in a go routine since it should be buffered, and check that it eventually succeed
queryChan := make(chan struct{})
go func() {
Expand All @@ -102,17 +117,17 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
}()

// set the serving type for the primary tablet true and broadcast it so that the buffering code registers this change
// this should stop the buffering and the query executed in the go routine should work. This should be done with some delay so
// that we know that the query was buffered
time.Sleep(1 * time.Second)
// this should stop the buffering and the query executed in the go routine should work.
hc.SetServing(primaryTablet, true)
hc.Broadcast(primaryTablet)

waitForBuffering(false)

// wait for the query to execute before checking for results
select {
case <-queryChan:
require.NoError(t, err)
require.Equal(t, res, sqlResult1)
require.Equal(t, sqlResult1, res)
case <-time.After(15 * time.Second):
t.Fatalf("timed out waiting for query to execute")
}
Expand Down

0 comments on commit d18e303

Please sign in to comment.