Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Dec 3, 2024
1 parent 2b71d1b commit 26a505e
Showing 1 changed file with 75 additions and 0 deletions.
75 changes: 75 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,81 @@ func RegisterFlags(fs *pflag.FlagSet) {
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}

// tabletFilter represents a filter for tablet records.
type tabletFilter interface {
IsIncluded(tablet *topodatapb.Tablet) bool
}

// keyspaceShardTabletFilter filters tablet records
// by keyspace and optionally shard.
type keyspaceShardTabletFilter struct {
keyspace, shard string
}

// NewKeyspaceTabletFilter creates a tablet record filter
// by keyspace.
func NewKeyspaceTabletFilter(keyspace string) tabletFilter {
return &keyspaceShardTabletFilter{keyspace}
}

// NewKeyspaceShardTabletFilter creates a tablet record filter
// by keyspace and shard.
func NewKeyspaceShardTabletFilter(keyspace, shard string) tabletFilter {
return &keyspaceShardTabletFilter{keyspace, shard}
}

// IsIncluded returns true if a tablet record passes the filter.
func (f keyspaceShardTabletFilter) IsIncluded(tablet *topodatapb.Tablet) bool {
if f.shard == "" {
return tablet.Keyspace == f.keyspace
}
return tablet.Keyspace == f.keyspace && tablet.Shard == f.shard
}

// tabletFilters represents one or many tabletFilter objects.
type tabletFilters []tabletFilter

// IsIncluded returns true if a tablet record passes the filter.
func (f tabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool {
for _, filter := range f {
if !filter.IsIncluded(tablet) {
return false
}
}
return true
}

// GetAllTablets gets all tablets from all cells using a goroutine per cell and a
// concurrency limit enforced by errgroup.
func GetAllTablets(ctx context.Context, cells []string) ([]*topo.TabletInfo, error) {
maxConcurrency := 4
cellConcurrency := 1
if maxConcurrency < topo.DefaultConcurrency {
cellConcurrency = topo.DefaultConcurrency / maxConcurrency
}

var tabletsMu sync.Mutex
tablets := make([]*topo.TabletInfo, 0)
eg, ctx := errgroup.WaitContext(ctx)
eg.SetLimit(maxConcurrency) // limit to 4 concurrency
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{
Concurrency: cellConcurrency,
})
if err != nil {
log.Errorf("Failed to load tablets from cell %s: %+v", cell, err)
return nil
}
tabletsMu.Lock()
defer tabletsMu.Unlock()
tablets = append(tablets, t)
return nil
})
}
return tablets, eg.Wait()
}

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
Expand Down

0 comments on commit 26a505e

Please sign in to comment.