Skip to content

Commit

Permalink
healthcheck: get all tablets in one topo call if possible
Browse files Browse the repository at this point in the history
Signed-off-by: deepthi <[email protected]>
  • Loading branch information
deepthi committed Dec 6, 2023
1 parent 7fa919c commit 6a7cf27
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 52 deletions.
56 changes: 14 additions & 42 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
sem chan int
getTablets func(tw *TopologyWatcher) ([]*topo.TabletInfo, error)
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -94,7 +93,7 @@ type TopologyWatcher struct {
// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topo.TabletInfo, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
Expand All @@ -103,7 +102,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
Expand All @@ -117,8 +115,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: topoReadConcurrency})
})
}

Expand Down Expand Up @@ -149,11 +147,10 @@ func (tw *TopologyWatcher) Stop() {
}

func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
// First get the list of all tablets.
tabletInfos, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
Expand All @@ -168,11 +165,11 @@ func (tw *TopologyWatcher) loadTablets() {

// Accumulate a list of all known alias strings to use later
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))
tabletAliasStrs := make([]string, 0, len(tabletInfos))

tw.mu.Lock()
for _, tAlias := range tabletAliases {
aliasStr := topoproto.TabletAliasString(tAlias)
for _, tInfo := range tabletInfos {
aliasStr := topoproto.TabletAliasString(tInfo.Alias)
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
Expand All @@ -182,38 +179,13 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}
}

wg.Add(1)
go func(alias *topodata.TabletAlias) {
defer wg.Done()
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
case <-tw.ctx.Done():
return
default:
}
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tablet.Tablet,
}
tw.mu.Unlock()
}(tAlias)
// There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines.
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tInfo.Tablet,
}
}

tw.mu.Unlock()
wg.Wait()
tw.mu.Lock()

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
Expand Down
20 changes: 10 additions & 10 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Fatalf("CreateTablet failed: %v", err)
}
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
Expand Down Expand Up @@ -178,9 +178,9 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
// If refreshKnownTablets is disabled, only the new tablet is read
// from the topo
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
}
checkChecksum(t, tw, 2762153755)

Expand All @@ -195,7 +195,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
}
Expand All @@ -221,7 +221,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
key = TabletToMapKey(tablet)

if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 1})

if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
Expand Down Expand Up @@ -264,7 +264,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Fatalf("UpdateTabletFields failed: %v", err)
}
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2})
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
if _, ok := allTablets[key2]; !ok {
Expand All @@ -288,7 +288,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Fatalf("UpdateTabletFields failed: %v", err)
}
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2})
}

// Remove the tablet and check that it is detected as being gone.
Expand All @@ -300,7 +300,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
}
tw.loadTablets()
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1})
}
Expand Down Expand Up @@ -551,7 +551,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {
require.NoError(t, ts.CreateTablet(context.Background(), tablet))

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check tablet is reported by HealthCheck
Expand All @@ -576,7 +576,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {
require.NoError(t, ts.CreateTablet(context.Background(), tablet2))

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1})
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
checkChecksum(t, tw, 2762153755)

// Check the new tablet is NOT reported by HealthCheck.
Expand Down

0 comments on commit 6a7cf27

Please sign in to comment.