Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx throttler: remove unused topology watchers #14412

Merged
merged 4 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions go/vt/discovery/replicationlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ func SetMinNumTablets(numTablets int) {
minNumTablets.Set(numTablets)
}

// IsReplicationLagHigh verifies that the given LegacytabletHealth refers to a tablet with high
// IsReplicationLagHigh verifies that the given TabletHealth refers to a tablet with high
// replication lag, i.e. higher than the configured discovery_low_replication_lag flag.
func IsReplicationLagHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > lowReplicationLag.Get().Seconds()
}

// IsReplicationLagVeryHigh verifies that the given LegacytabletHealth refers to a tablet with very high
// IsReplicationLagVeryHigh verifies that the given TabletHealth refers to a tablet with very high
// replication lag, i.e. higher than the configured discovery_high_replication_lag_minimum_serving flag.
func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > highReplicationLagMinServing.Get().Seconds()
Expand Down Expand Up @@ -153,7 +153,7 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal
return filterStatsByLag(tabletHealthList)
}
res := filterStatsByLagWithLegacyAlgorithm(tabletHealthList)
// run the filter again if exactly one tablet is removed,
// Run the filter again if exactly one tablet is removed,
// and we have spare tablets.
if len(res) > minNumTablets.Get() && len(res) == len(tabletHealthList)-1 {
res = filterStatsByLagWithLegacyAlgorithm(res)
Expand All @@ -164,12 +164,12 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal

func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]tabletLagSnapshot, 0, len(tabletHealthList))
// filter non-serving tablets and those with very high replication lag
// Filter out non-serving tablets and those with very high replication lag.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil || IsReplicationLagVeryHigh(ts) {
continue
}
// Pull the current replication lag for a stable sort later.
// Save the current replication lag for a stable sort later.
list = append(list, tabletLagSnapshot{
ts: ts,
replag: ts.Stats.ReplicationLagSeconds})
Expand All @@ -178,7 +178,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
// Sort by replication lag.
sort.Sort(tabletLagSnapshotList(list))

// Pick those with low replication lag, but at least minNumTablets tablets regardless.
// Pick tablets with low replication lag, but at least minNumTablets tablets regardless.
res := make([]*TabletHealth, 0, len(list))
for i := 0; i < len(list); i++ {
if !IsReplicationLagHigh(list[i].ts) || i < minNumTablets.Get() {
Expand All @@ -190,7 +190,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {

func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]*TabletHealth, 0, len(tabletHealthList))
// filter non-serving tablets
// Filter out non-serving tablets.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil {
continue
Expand All @@ -200,7 +200,7 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(list) <= 1 {
return list
}
// if all have low replication lag (<=30s), return all tablets.
// If all tablets have low replication lag (<=30s), return all of them.
allLowLag := true
for _, ts := range list {
if IsReplicationLagHigh(ts) {
Expand All @@ -211,12 +211,12 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if allLowLag {
return list
}
// filter those affecting "mean" lag significantly
// calculate mean for all tablets
// We want to filter out tablets that are affecting "mean" lag significantly.
// We first calculate the mean across all tablets.
res := make([]*TabletHealth, 0, len(list))
m, _ := mean(list, -1)
for i, ts := range list {
// calculate mean by excluding ith tablet
// Now we calculate the mean by excluding ith tablet
mi, _ := mean(list, i)
if float64(mi) > float64(m)*0.7 {
res = append(res, ts)
Expand All @@ -225,9 +225,11 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(res) >= minNumTablets.Get() {
return res
}
// return at least minNumTablets tablets to avoid over loading,
// if there is enough tablets with replication lag < highReplicationLagMinServing.
// Pull the current replication lag for a stable sort.

// We want to return at least minNumTablets tablets to avoid overloading,
// as long as there are enough tablets with replication lag < highReplicationLagMinServing.

// Save the current replication lag for a stable sort.
snapshots := make([]tabletLagSnapshot, 0, len(list))
for _, ts := range list {
if !IsReplicationLagVeryHigh(ts) {
Expand Down
70 changes: 34 additions & 36 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ var (
"Operation", topologyWatcherOpListTablets, topologyWatcherOpGetTablet)
)

// tabletInfo is used internally by the TopologyWatcher class
// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
tablet *topodata.Tablet
}

// TopologyWatcher polls tablet from a configurable set of tablets
// periodically. When tablets are added / removed, it calls
// the LegacyTabletRecorder AddTablet / RemoveTablet interface appropriately.
// TopologyWatcher polls the topology periodically for changes to
// the set of tablets. When tablets are added / removed / modified,
// it calls the AddTablet / RemoveTablet interface appropriately.
type TopologyWatcher struct {
// set at construction time
topoServer *topo.Server
Expand All @@ -79,20 +79,21 @@ type TopologyWatcher struct {

// mu protects all variables below
mu sync.Mutex
// tablets contains a map of alias -> tabletInfo for all known tablets
// tablets contains a map of alias -> tabletInfo for all known tablets.
tablets map[string]*tabletInfo
// topoChecksum stores a crc32 of the tablets map and is exported as a metric
// topoChecksum stores a crc32 of the tablets map and is exported as a metric.
topoChecksum uint32
// lastRefresh records the timestamp of the last topo refresh
// lastRefresh records the timestamp of the last refresh of the topology.
lastRefresh time.Time
// firstLoadDone is true when first load of the topology data is done.
// firstLoadDone is true when the initial load of the topology data is complete.
firstLoadDone bool
// firstLoadChan is closed when the initial loading of topology data is done.
// firstLoadChan is closed when the initial load of topology data is complete.
firstLoadChan chan struct{}
}

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
Expand All @@ -114,14 +115,14 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
}

// Start starts the topology watcher
// Start starts the topology watcher.
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
go func(t *TopologyWatcher) {
Expand All @@ -140,7 +141,7 @@ func (tw *TopologyWatcher) Start() {
}(tw)
}

// Stop stops the watcher. It does not clean up the tablets added to LegacyTabletRecorder.
// Stop stops the watcher. It does not clean up the tablets added to HealthCheck.
func (tw *TopologyWatcher) Stop() {
tw.cancelFunc()
// wait for watch goroutine to finish.
Expand All @@ -151,7 +152,7 @@ func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// first get the list of relevant tabletAliases
// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
Expand All @@ -166,7 +167,7 @@ func (tw *TopologyWatcher) loadTablets() {
}

// Accumulate a list of all known alias strings to use later
// when sorting
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))

tw.mu.Lock()
Expand All @@ -175,7 +176,7 @@ func (tw *TopologyWatcher) loadTablets() {
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
// we already have a tabletInfo for this and the flag tells us to not refresh
// We already have a tabletInfo for this and the flag tells us to not refresh.
if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
continue
Expand All @@ -188,7 +189,7 @@ func (tw *TopologyWatcher) loadTablets() {
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
Expand Down Expand Up @@ -218,7 +219,7 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}

// trust the alias from topo and add it if it doesn't exist
// Trust the alias from topo and add it if it doesn't exist.
if val, ok := tw.tablets[alias]; ok {
// check if the host and port have changed. If yes, replace tablet.
oldKey := TabletToMapKey(val.tablet)
Expand All @@ -230,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
} else {
// This is a new tablet record, let's add it to the healthcheck
// This is a new tablet record, let's add it to the HealthCheck.
tw.healthcheck.AddTablet(newVal.tablet)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
}
Expand All @@ -252,8 +253,8 @@ func (tw *TopologyWatcher) loadTablets() {
close(tw.firstLoadChan)
}

// iterate through the tablets in a stable order and compute a
// checksum of the tablet map
// Iterate through the tablets in a stable order and compute a
// checksum of the tablet map.
sort.Strings(tabletAliasStrs)
var buf bytes.Buffer
for _, alias := range tabletAliasStrs {
Expand All @@ -269,15 +270,15 @@ func (tw *TopologyWatcher) loadTablets() {

}

// RefreshLag returns the time since the last refresh
// RefreshLag returns the time since the last refresh.
func (tw *TopologyWatcher) RefreshLag() time.Duration {
tw.mu.Lock()
defer tw.mu.Unlock()

return time.Since(tw.lastRefresh)
}

// TopoChecksum returns the checksum of the current state of the topo
// TopoChecksum returns the checksum of the current state of the topo.
func (tw *TopologyWatcher) TopoChecksum() uint32 {
tw.mu.Lock()
defer tw.mu.Unlock()
Expand All @@ -286,7 +287,7 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
}

// TabletFilter is an interface that can be given to a TopologyWatcher
// to be applied as an additional filter on the list of tablets returned by its getTablets function
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
IsIncluded(tablet *topodata.Tablet) bool
Expand All @@ -300,18 +301,18 @@ type FilterByShard struct {
}

// filterShard describes a filter for a given shard or keyrange inside
// a keyspace
// a keyspace.
type filterShard struct {
keyspace string
shard string
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
}

// NewFilterByShard creates a new FilterByShard on top of an existing
// LegacyTabletRecorder. Each filter is a keyspace|shard entry, where shard
// NewFilterByShard creates a new FilterByShard for use by a
// TopologyWatcher. Each filter is a keyspace|shard entry, where shard
// can either be a shard name, or a keyrange. All tablets that match
// at least one keyspace|shard tuple will be forwarded to the
// underlying LegacyTabletRecorder.
// at least one keyspace|shard tuple will be forwarded by the
// TopologyWatcher to its consumer.
func NewFilterByShard(filters []string) (*FilterByShard, error) {
m := make(map[string][]*filterShard)
for _, filter := range filters {
Expand Down Expand Up @@ -348,8 +349,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}, nil
}

// IsIncluded returns true iff the tablet's keyspace and shard should be
// forwarded to the underlying LegacyTabletRecorder.
// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
Expand All @@ -370,15 +370,14 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
return false
}

// FilterByKeyspace is a filter that filters tablets by
// keyspace
// FilterByKeyspace is a filter that filters tablets by keyspace.
type FilterByKeyspace struct {
keyspaces map[string]bool
}

// NewFilterByKeyspace creates a new FilterByKeyspace.
// Each filter is a keyspace entry. All tablets that match
// a keyspace will be forwarded to the underlying LegacyTabletRecorder.
// a keyspace will be forwarded to the TopologyWatcher's consumer.
func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
m := make(map[string]bool)
for _, keyspace := range selectedKeyspaces {
Expand All @@ -390,8 +389,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}
}

// IsIncluded returns true if the tablet's keyspace should be
// forwarded to the underlying LegacyTabletRecorder.
// IsIncluded returns true if the tablet's keyspace matches what we have.
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
Expand Down

This file was deleted.

Loading
Loading