Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition that prevents queries from being buffered after vtgate startup #16655

Merged
merged 10 commits into from
Aug 29, 2024
Merged
58 changes: 56 additions & 2 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package discovery
import (
"context"
"fmt"
"slices"
"sync"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand All @@ -37,6 +39,11 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

var (
// waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
waitConsistentKeyspacesCheck = 100 * time.Millisecond
)

// KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents
// for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved.
// Right now this is capable of detecting the end of failovers, both planned and unplanned,
Expand Down Expand Up @@ -683,8 +690,12 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target
ks.mu.Lock()
defer ks.mu.Unlock()
if state, ok := ks.shards[target.Shard]; ok {
// If the primary tablet was present then externallyReparented will be non-zero and
// currentPrimary will be not nil.
// The first time we receive an update for a serving primary, we set the externallyReparented value and currentPrimary values.
// These never get reset, so the last two checks checking for them being non-empty is purely for defensive reasons, so that we don't
// return that the primary is not serving when there is no primary that the keyspace event watcher has seen yet.
// The reason this function returns if the Primary is not serving and not just if it is serving, because we want to very defensive in when we say
// the primary is not serving. This function is used to start buffering and we don't want to start buffering when we don't know for sure if the primary
// is not serving and we will receive an update that stops buffering soon.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, false
Expand All @@ -703,3 +714,46 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
}
return servingKeyspaces
}

// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error {
// We don't want to change the original keyspace list that we receive so we clone it
// before we empty it elements down below.
keyspaces := slices.Clone(ksList)
for {
// We empty keyspaces as we find them to be consistent.
allConsistent := true
for i, ks := range keyspaces {
if ks == "" {
continue
}

// Get the keyspace status and see it is consistent yet or not.
kss := kew.getKeyspaceStatus(ctx, ks)
// If kss is nil, then it must be deleted. In that case too it is fine for us to consider
// it consistent since the keyspace has been deleted.
if kss == nil || kss.consistent {
keyspaces[i] = ""
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
} else {
allConsistent = false
}
}

if allConsistent {
// all the keyspaces are consistent.
return nil
}

// Unblock after the sleep or when the context has expired.
select {
case <-ctx.Done():
for _, ks := range keyspaces {
if ks != "" {
log.Infof("keyspace %v didn't become consistent", ks)
}
}
return ctx.Err()
case <-time.After(waitConsistentKeyspacesCheck):
}
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}
}
81 changes: 81 additions & 0 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,87 @@ func TestKeyspaceEventTypes(t *testing.T) {
}
}

// TestWaitForConsistentKeyspaces tests the behaviour of WaitForConsistent for different scenarios.
func TestWaitForConsistentKeyspaces(t *testing.T) {
testcases := []struct {
name string
ksMap map[string]*keyspaceState
ksList []string
errExpected string
}{
{
name: "Empty keyspace list",
ksList: nil,
ksMap: map[string]*keyspaceState{
"ks1": {},
},
errExpected: "",
},
{
name: "All keyspaces consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: true,
},
},
errExpected: "",
},
{
name: "One keyspace inconsistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: false,
},
},
errExpected: "context canceled",
},
{
name: "One deleted keyspace - consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
deleted: true,
},
},
errExpected: "",
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
// We create a cancelable context and immediately cancel it.
// We don't want the unit tests to wait, so we only test the first
// iteration of whether the keyspace event watcher returns
// that the keyspaces are consistent or not.
ctx, cancel := context.WithCancel(context.Background())
cancel()
kew := KeyspaceEventWatcher{
keyspaces: tt.ksMap,
mu: sync.Mutex{},
ts: &fakeTopoServer{},
}
err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList)
if tt.errExpected != "" {
require.ErrorContains(t, err, tt.errExpected)
} else {
require.NoError(t, err)
}

})
}
}

type fakeTopoServer struct {
}

Expand Down
14 changes: 7 additions & 7 deletions go/vt/srvtopo/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package srvtopo

import (
"sync"

"context"
"sync"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
Expand All @@ -29,15 +28,16 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// FindAllTargets goes through all serving shards in the topology for the provided keyspaces
// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces
// and tablet types. If no keyspaces are provided all available keyspaces in the topo are
// fetched. It returns one Target object per keyspace/shard/matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
// It also returns all the keyspaces that it found.
func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) {
var err error
if len(keyspaces) == 0 {
keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
return nil, nil, err
}
}

Expand Down Expand Up @@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str
}
wg.Wait()
if errRecorder.HasErrors() {
return nil, errRecorder.Error()
return nil, nil, errRecorder.Error()
}

return targets, nil
return targets, keyspaces, nil
}
41 changes: 25 additions & 16 deletions go/vt/srvtopo/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (a TargetArray) Less(i, j int) bool {
return a[i].TabletType < a[j].TabletType
}

func TestFindAllTargets(t *testing.T) {
func TestFindAllTargetsAndKeyspaces(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "cell1", "cell2")
Expand All @@ -65,9 +65,10 @@ func TestFindAllTargets(t *testing.T) {
rs := NewResilientServer(ctx, ts, counts)

// No keyspace / shards.
ks, err := FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
targets, ksList, err := FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.Len(t, ks, 0)
assert.Len(t, targets, 0)
assert.EqualValues(t, []string{"test_keyspace"}, ksList)

// Add one.
assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace", &topodatapb.SrvKeyspace{
Expand All @@ -84,7 +85,7 @@ func TestFindAllTargets(t *testing.T) {
}))

// Get it.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Expand All @@ -93,10 +94,11 @@ func TestFindAllTargets(t *testing.T) {
Shard: "test_shard0",
TabletType: topodatapb.TabletType_PRIMARY,
},
}, ks)
}, targets)
assert.EqualValues(t, []string{"test_keyspace"}, ksList)

// Get any keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Expand All @@ -105,7 +107,8 @@ func TestFindAllTargets(t *testing.T) {
Shard: "test_shard0",
TabletType: topodatapb.TabletType_PRIMARY,
},
}, ks)
}, targets)
assert.EqualValues(t, []string{"test_keyspace"}, ksList)

// Add another one.
assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace2", &topodatapb.SrvKeyspace{
Expand All @@ -130,9 +133,9 @@ func TestFindAllTargets(t *testing.T) {
}))

// Get it for any keyspace, all types.
ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
sort.Sort(TargetArray(ks))
sort.Sort(TargetArray(targets))
assert.EqualValues(t, []*querypb.Target{
{
Cell: "cell1",
Expand All @@ -152,10 +155,12 @@ func TestFindAllTargets(t *testing.T) {
Shard: "test_shard2",
TabletType: topodatapb.TabletType_REPLICA,
},
}, ks)
}, targets)
sort.Strings(ksList)
assert.EqualValues(t, []string{"test_keyspace", "test_keyspace2"}, ksList)

// Only get 1 keyspace for all types.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Expand All @@ -170,10 +175,11 @@ func TestFindAllTargets(t *testing.T) {
Shard: "test_shard2",
TabletType: topodatapb.TabletType_REPLICA,
},
}, ks)
}, targets)
assert.EqualValues(t, []string{"test_keyspace2"}, ksList)

// Only get the REPLICA targets for any keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA})
targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.Equal(t, []*querypb.Target{
{
Expand All @@ -182,10 +188,13 @@ func TestFindAllTargets(t *testing.T) {
Shard: "test_shard2",
TabletType: topodatapb.TabletType_REPLICA,
},
}, ks)
}, targets)
sort.Strings(ksList)
assert.EqualValues(t, []string{"test_keyspace", "test_keyspace2"}, ksList)

// Get non-existent keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.Len(t, ks, 0)
assert.Len(t, targets, 0)
assert.EqualValues(t, []string{"doesnt-exist"}, ksList)
}
21 changes: 18 additions & 3 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,24 @@ func (gw *TabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [
}

// Finds the targets to look for.
targets, err := srvtopo.FindAllTargets(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait)
targets, keyspaces, err := srvtopo.FindAllTargetsAndKeyspaces(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait)
if err != nil {
return err
}
return gw.hc.WaitForAllServingTablets(ctx, targets)
err = gw.hc.WaitForAllServingTablets(ctx, targets)
if err != nil {
return err
}
// After having waited for all serving tablets. We should also wait for the keyspace event watcher to have seen
// the updates and marked all the keyspaces as consistent (if we want to wait for primary tablets).
// Otherwise, we could be in a situation where even though the healthchecks have arrived, the keyspace event watcher hasn't finished processing them.
// So, if a primary tablet goes non-serving (because of a PRS or some other reason), we won't be able to start buffering.
// Waiting for the keyspaces to become consistent ensures that all the primary tablets for all the shards should be serving as seen by the keyspace event watcher
// and any disruption from now on, will make sure we start buffering properly.
if topoproto.IsTypeInList(topodatapb.TabletType_PRIMARY, tabletTypesToWait) && gw.kev != nil {
return gw.kev.WaitForConsistentKeyspaces(ctx, keyspaces)
}
return nil
}

// Close shuts down underlying connections.
Expand Down Expand Up @@ -282,7 +295,9 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
if len(tablets) == 0 {
// if we have a keyspace event watcher, check if the reason why our primary is not available is that it's currently being resharded
// or if a reparent operation is in progress.
if kev := gw.kev; kev != nil {
// We only check for whether reshard is ongoing or primary is serving or not, only if the target is primary. We don't want to buffer
// replica queries, so it doesn't make any sense to check for resharding or reparenting in that case.
if kev := gw.kev; kev != nil && target.TabletType == topodatapb.TabletType_PRIMARY {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
if kev.TargetIsBeingResharded(ctx, target) {
log.V(2).Infof("current keyspace is being resharded, retrying: %s: %s", target.Keyspace, debug.Stack())
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress)
Expand Down
Loading
Loading