Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Dec 13, 2024
1 parent 71d84d5 commit a1c4167
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 24 deletions.
54 changes: 32 additions & 22 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,27 @@ var (
shutdownWaitTime = 30 * time.Second
shardsLockCounter int32
shardsToWatch = make(map[string]bool, 0)
shardsToWatchMu sync.Mutex

// ErrNoPrimaryTablet is a fixed error message.
ErrNoPrimaryTablet = errors.New("no primary tablet found")
)

// parseClustersToWatch parses the --clusters_to_watch flag-value
// into a map of keyspace/shards. This is called once at init
// time because the list never changes.
func parseClustersToWatch() {
// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}

// updateShardsToWatch parses the --clusters_to_watch flag-value
// into a map of keyspace/shards.
func updateShardsToWatch() {
if ts == nil {
return
}
shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()

for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") {
// Validate keyspace/shard parses.
Expand Down Expand Up @@ -90,14 +102,8 @@ func parseClustersToWatch() {
}
}

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}

// GetAllTablets gets all tablets from all cells using a goroutine per cell.
func GetAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo {
// getAllTablets gets all tablets from all cells using a goroutine per cell.
func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo {
var tabletsMu sync.Mutex
tablets := make([]*topo.TabletInfo, 0)
eg, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -130,7 +136,7 @@ func OpenTabletDiscovery() <-chan time.Time {
log.Error(err)
}
// Parse --clusters_to_watch into a filter.
parseClustersToWatch()
updateShardsToWatch()
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -161,25 +167,29 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
// Get all tablets from all cells.
getTabletsCtx, getTabletsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getTabletsCancel()
tablets := GetAllTablets(getTabletsCtx, cells)
tablets := getAllTablets(getTabletsCtx, cells)
if len(tablets) == 0 {
log.Error("Found no tablets")
return nil
}

// Filter tablets that should not be watched using shardsToWatch map.
filteredTablets := make([]*topo.TabletInfo, 0, len(tablets))
for _, t := range tablets {
shardKey := topoproto.KeyspaceShardString(t.Tablet.Keyspace, t.Tablet.Shard)
if len(shardsToWatch) > 0 && !shardsToWatch[shardKey] {
continue // filter
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()
for _, t := range tablets {
shardKey := topoproto.KeyspaceShardString(t.Tablet.Keyspace, t.Tablet.Shard)
if len(shardsToWatch) > 0 && !shardsToWatch[shardKey] {
continue // filter
}
matchedTablets = append(matchedTablets, t)
}
filteredTablets = append(filteredTablets, t)
}
}()

// Refresh the filtered tablets.
query := "select alias from vitess_tablet"
refreshTablets(filteredTablets, query, nil, loader, forceRefresh, nil)
refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil)

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var (
}
)

func TestParseClustersToWatch(t *testing.T) {
func TestUpdateShardsToWatch(t *testing.T) {
oldClustersToWatch := clustersToWatch
oldTs := ts
defer func() {
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestParseClustersToWatch(t *testing.T) {
shardsToWatch = make(map[string]bool, 0)
}()
clustersToWatch = testCase.in
parseClustersToWatch()
updateShardsToWatch()
require.Equal(t, testCase.expected, shardsToWatch)
})
}
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ func refreshAllInformation(ctx context.Context) error {
return RefreshAllKeyspacesAndShards(ctx)
})

// Refresh shards to watch.
eg.Go(func() error {
updateShardsToWatch()
return nil
})

// Refresh all tablets.
eg.Go(func() error {
return refreshAllTablets(ctx)
Expand Down

0 comments on commit a1c4167

Please sign in to comment.