From e6d7f233cfb511796229a7b411a8c44868775fe6 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 28 Nov 2024 18:09:44 +0100 Subject: [PATCH] WIP Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/keyspace_shard_discovery.go | 51 ++--- go/vt/vtorc/logic/tablet_discovery.go | 180 ++++++++++++------ go/vt/vtorc/logic/vtorc.go | 73 ++++--- 3 files changed, 203 insertions(+), 101 deletions(-) diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index b1e93fe2a01..c21ab3a0d0a 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -23,20 +23,27 @@ import ( "sync" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtorc/inst" ) +type KeyspaceShardDiscovery struct { + ts *topo.Server +} + +func NewKeyspaceShardDiscovery(ts *topo.Server) *KeyspaceShardDiscovery { + return &KeyspaceShardDiscovery{ts} +} + // RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. -func RefreshAllKeyspacesAndShards() { +func (ksd *KeyspaceShardDiscovery) RefreshAllKeyspacesAndShards(ctx context.Context) { var keyspaces []string if len(clustersToWatch) == 0 { // all known keyspaces - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + getCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() var err error // Get all the keyspaces - keyspaces, err = ts.GetKeyspaces(ctx) + keyspaces, err = ksd.ts.GetKeyspaces(getCtx) if err != nil { log.Error(err) return @@ -62,7 +69,7 @@ func RefreshAllKeyspacesAndShards() { // Sort the list of keyspaces. // The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace sort.Strings(keyspaces) - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() var wg sync.WaitGroup for idx, keyspace := range keyspaces { @@ -75,42 +82,42 @@ func RefreshAllKeyspacesAndShards() { wg.Add(2) go func(keyspace string) { defer wg.Done() - _ = refreshKeyspaceHelper(refreshCtx, keyspace) + _ = ksd.refreshKeyspaceHelper(refreshCtx, keyspace) }(keyspace) go func(keyspace string) { defer wg.Done() - _ = refreshAllShards(refreshCtx, keyspace) + _ = ksd.refreshAllShards(refreshCtx, keyspace) }(keyspace) } wg.Wait() } // RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard. -func RefreshKeyspaceAndShard(keyspaceName string, shardName string) error { - err := refreshKeyspace(keyspaceName) +func (ksd *KeyspaceShardDiscovery) RefreshKeyspaceAndShard(ctx context.Context, keyspaceName string, shardName string) error { + err := ksd.refreshKeyspace(ctx, keyspaceName) if err != nil { return err } - return refreshShard(keyspaceName, shardName) + return ksd.refreshShard(ctx, keyspaceName, shardName) } // refreshKeyspace refreshes the keyspace's information for the given keyspace from the topo -func refreshKeyspace(keyspaceName string) error { - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) +func (ksd *KeyspaceShardDiscovery) refreshKeyspace(ctx context.Context, keyspaceName string) error { + refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() - return refreshKeyspaceHelper(refreshCtx, keyspaceName) + return ksd.refreshKeyspaceHelper(refreshCtx, keyspaceName) } // refreshShard refreshes the shard's information for the given keyspace/shard from the topo -func refreshShard(keyspaceName, shardName string) error { - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) +func (ksd *KeyspaceShardDiscovery) refreshShard(ctx context.Context, keyspaceName, shardName string) error { + refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() - return refreshSingleShardHelper(refreshCtx, keyspaceName, shardName) + return ksd.refreshSingleShardHelper(refreshCtx, keyspaceName, shardName) } // refreshKeyspaceHelper is a helper function which reloads the given keyspace's information -func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { - keyspaceInfo, err := ts.GetKeyspace(ctx, keyspaceName) +func (ksd *KeyspaceShardDiscovery) refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { + keyspaceInfo, err := ksd.ts.GetKeyspace(ctx, keyspaceName) if err != nil { log.Error(err) return err @@ -123,8 +130,8 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { } // refreshAllShards refreshes all the shard records in the given keyspace. -func refreshAllShards(ctx context.Context, keyspaceName string) error { - shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{ +func (ksd *KeyspaceShardDiscovery) refreshAllShards(ctx context.Context, keyspaceName string) error { + shardInfos, err := ksd.ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{ // Fetch shard records concurrently to speed up discovery. A typical // Vitess cluster will have 1-3 vtorc instances deployed, so there is // little risk of a thundering herd. @@ -145,8 +152,8 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { } // refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard. -func refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error { - shardInfo, err := ts.GetShard(ctx, keyspaceName, shardName) +func (ksd *KeyspaceShardDiscovery) refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error { + shardInfo, err := ksd.ts.GetShard(ctx, keyspaceName, shardName) if err != nil { log.Error(err) return err diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 1ee9e2cfefa..5308d19e2e0 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -27,12 +27,12 @@ import ( "time" "github.com/spf13/pflag" - "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" @@ -40,16 +40,13 @@ import ( "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vtorc/process" "vitess.io/vitess/go/vt/vttablet/tmclient" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( - ts *topo.Server - tmc tmclient.TabletManagerClient clustersToWatch []string shutdownWaitTime = 30 * time.Second shardsLockCounter int32 + // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") ) @@ -60,42 +57,99 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } +type TabletDiscovery struct { + keyspaceShards []*topo.KeyspaceShard + refreshTicker *time.Ticker + vtorc *VTOrc +} + // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker // channel for polling. -func OpenTabletDiscovery() <-chan time.Time { - ts = topo.Open() - tmc = inst.InitializeTMC() +func NewTabletDiscovery(v *VTOrc) (*TabletDiscovery, error) { // Clear existing cache and perform a new refresh. if _, err := db.ExecVTOrc("DELETE FROM vitess_tablet"); err != nil { - log.Error(err) + return nil, err + } + + td := &TabletDiscovery{ + vtorc: v, } + + // Parse --clusters_to_watch to keyspace/shard infos. + td.parseKeyspaceShardsToWatch() + // We refresh all information from the topo once before we start the ticks to do it on a timer. - populateAllInformation() - return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker -} + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + if err := td.vtorc.RefreshAllInformation(ctx); err != nil { + return nil, err + } + td.refreshTicker = time.NewTicker(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) -// populateAllInformation initializes all the information for VTOrc to function. -func populateAllInformation() { - refreshAllInformation() // We have completed one full discovery cycle. We should update the process health. process.FirstDiscoveryCycleComplete.Store(true) + + return td, nil +} + +func (td *TabletDiscovery) Close() { + if td.refreshTicker != nil { + td.refreshTicker.Stop() + } +} + +func (td *TabletDiscovery) tmClient() tmclient.TabletManagerClient { return td.vtorc.tmc } +func (td *TabletDiscovery) topoServer() *topo.Server { return td.vtorc.ts } + +// Parse input and build list of keyspaces / shards +func (td *TabletDiscovery) parseKeyspaceShardsToWatch() { + td.keyspaceShards = make([]*topo.KeyspaceShard, 0) + for _, ks := range clustersToWatch { + if strings.Contains(ks, "/") { + // This is a keyspace/shard specification + input := strings.Split(ks, "/") + td.keyspaceShards = append(td.keyspaceShards, &topo.KeyspaceShard{ + Keyspace: input[0], + Shard: input[1], + }) + } else { + // Assume this is a keyspace and find all shards in keyspace + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + shards, err := td.topoServer().GetShardNames(ctx, ks) + if err != nil { + // Log the errr and continue + log.Errorf("Error fetching shards for keyspace: %v", ks) + continue + } + if len(shards) == 0 { + log.Errorf("Topo has no shards for ks: %v", ks) + continue + } + for _, s := range shards { + td.keyspaceShards = append(td.keyspaceShards, &topo.KeyspaceShard{ + Keyspace: ks, + Shard: s, + }) + } + } + } } -// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while -func refreshAllTablets() { - refreshTabletsUsing(func(tabletAlias string) { - DiscoverInstance(tabletAlias, false /* forceDiscovery */) +// RefreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while +func (td *TabletDiscovery) RefreshAllTablets() error { + return td.refreshTabletsUsing(func(tabletAlias string) { + td.vtorc.DiscoverInstance(tabletAlias, false /* forceDiscovery */) }, false /* forceRefresh */) } -func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { +func (td *TabletDiscovery) refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) error { if len(clustersToWatch) == 0 { // all known clusters ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() - cells, err := ts.GetKnownCells(ctx) + cells, err := td.topoServer().GetKnownCells(ctx) if err != nil { - log.Error(err) - return + return err } refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) @@ -105,7 +159,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { wg.Add(1) go func(cell string) { defer wg.Done() - refreshTabletsInCell(refreshCtx, cell, loader, forceRefresh) + td.refreshTabletsInCell(refreshCtx, cell, loader, forceRefresh) // TODO: errgroup }(cell) } wg.Wait() @@ -121,7 +175,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { // Assume this is a keyspace and find all shards in keyspace ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() - shards, err := ts.GetShardNames(ctx, ks) + shards, err := td.topoServer().GetShardNames(ctx, ks) if err != nil { // Log the errr and continue log.Errorf("Error fetching shards for keyspace: %v", ks) @@ -138,7 +192,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } if len(keyspaceShards) == 0 { log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch) - return + return nil } refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer refreshCancel() @@ -147,54 +201,69 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { wg.Add(1) go func(ks *topo.KeyspaceShard) { defer wg.Done() - refreshTabletsInKeyspaceShard(refreshCtx, ks.Keyspace, ks.Shard, loader, forceRefresh, nil) + td.refreshTabletsInKeyspaceShard(refreshCtx, ks.Keyspace, ks.Shard, loader, forceRefresh, nil) }(ks) } wg.Wait() } + return nil } -func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { +func (td *TabletDiscovery) refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) error { +<<<<<<< Updated upstream +<<<<<<< Updated upstream +<<<<<<< Updated upstream tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) +======= + tablets, err := td.topoServer().GetTabletsByCell(ctx, cell, nil) +>>>>>>> Stashed changes +======= + tablets, err := td.topoServer().GetTabletsByCell(ctx, cell, nil) +>>>>>>> Stashed changes +======= + tablets, err := td.topoServer().GetTabletsByCell(ctx, cell, nil) +>>>>>>> Stashed changes if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) - return + return err } - query := "select alias from vitess_tablet where cell = ?" + query := "SELECT alias FROM vitess_tablet WHERE cell = ?" args := sqlutils.Args(cell) refreshTablets(tablets, query, args, loader, forceRefresh, nil) + return err } // forceRefreshAllTabletsInShard is used to refresh all the tablet's information (both MySQL information and topo records) // for a given shard. This function is meant to be called before or after a cluster-wide operation that we know will // change the replication information for the entire cluster drastically enough to warrant a full forceful refresh -func forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string, tabletsToIgnore []string) { +func (td *TabletDiscovery) forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string, tabletsToIgnore []string) error { refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() - refreshTabletsInKeyspaceShard(refreshCtx, keyspace, shard, func(tabletAlias string) { - DiscoverInstance(tabletAlias, true) + return td.refreshTabletsInKeyspaceShard(refreshCtx, keyspace, shard, func(tabletAlias string) { + td.vtorc.DiscoverInstance(tabletAlias, true) }, true, tabletsToIgnore) } // refreshTabletInfoOfShard only refreshes the tablet records from the topo-server for all the tablets // of the given keyspace-shard. -func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) { - log.Infof("refresh of tablet records of shard - %v/%v", keyspace, shard) - refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(tabletAlias string) { +func (td *TabletDiscovery) refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) error { + log.Infof("Refresh of tablet records of shard - %v/%v", keyspace, shard) + return td.refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(tabletAlias string) { // No-op // We only want to refresh the tablet information for the given shard }, false, nil) } -func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { - tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard) +func (td *TabletDiscovery) refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) error { + tablets, err := td.topoServer().GetTabletsByShard(ctx, keyspace, shard) if err != nil { log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err) - return + return err } - query := "select alias from vitess_tablet where keyspace = ? and shard = ?" + query := "SELECT alias FROM vitess_tablet WHERE keyspace = ? AND shard = ?" args := sqlutils.Args(keyspace, shard) refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore) + return nil } func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { @@ -207,14 +276,14 @@ func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader latestInstances[tabletAliasString] = true old, err := inst.ReadTablet(tabletAliasString) if err != nil && err != inst.ErrTabletAliasNil { - log.Error(err) + log.Errorf("Failed to read tablet %s: %+v", tabletAliasString, err) continue } if !forceRefresh && proto.Equal(tablet, old) { continue } if err := inst.SaveTablet(tablet); err != nil { - log.Error(err) + log.Errorf("Failed to save tablet %s: %+v", tabletAliasString, err) continue } wg.Add(1) @@ -253,7 +322,7 @@ func getLockAction(analysedInstance string, code inst.AnalysisCode) string { } // LockShard locks the keyspace-shard preventing others from performing conflicting actions. -func LockShard(ctx context.Context, tabletAlias string, lockAction string) (context.Context, func(*error), error) { +func (td *TabletDiscovery) LockShard(ctx context.Context, tabletAlias string, lockAction string) (context.Context, func(*error), error) { if tabletAlias == "" { return nil, nil, errors.New("can't lock shard: instance is unspecified") } @@ -268,7 +337,7 @@ func LockShard(ctx context.Context, tabletAlias string, lockAction string) (cont } atomic.AddInt32(&shardsLockCounter, 1) - ctx, unlock, err := ts.TryLockShard(ctx, tablet.Keyspace, tablet.Shard, lockAction) + ctx, unlock, err := td.topoServer().TryLockShard(ctx, tablet.Keyspace, tablet.Shard, lockAction) if err != nil { atomic.AddInt32(&shardsLockCounter, -1) return nil, nil, err @@ -280,38 +349,38 @@ func LockShard(ctx context.Context, tabletAlias string, lockAction string) (cont } // tabletUndoDemotePrimary calls the said RPC for the given tablet. -func tabletUndoDemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, semiSync bool) error { +func (td *TabletDiscovery) tabletUndoDemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, semiSync bool) error { tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer tmcCancel() - return tmc.UndoDemotePrimary(tmcCtx, tablet, semiSync) + return td.tmClient().UndoDemotePrimary(tmcCtx, tablet, semiSync) } // setReadOnly calls the said RPC for the given tablet -func setReadOnly(ctx context.Context, tablet *topodatapb.Tablet) error { +func (td *TabletDiscovery) setReadOnly(ctx context.Context, tablet *topodatapb.Tablet) error { tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer tmcCancel() - return tmc.SetReadOnly(tmcCtx, tablet) + return td.tmClient().SetReadOnly(tmcCtx, tablet) } // changeTabletType calls the said RPC for the given tablet with the given parameters. -func changeTabletType(ctx context.Context, tablet *topodatapb.Tablet, tabletType topodatapb.TabletType, semiSync bool) error { +func (td *TabletDiscovery) changeTabletType(ctx context.Context, tablet *topodatapb.Tablet, tabletType topodatapb.TabletType, semiSync bool) error { tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer tmcCancel() - return tmc.ChangeType(tmcCtx, tablet, tabletType, semiSync) + return td.tmClient().ChangeType(tmcCtx, tablet, tabletType, semiSync) } // resetReplicationParameters resets the replication parameters on the given tablet. -func resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet) error { +func (td *TabletDiscovery) resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet) error { tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer tmcCancel() - return tmc.ResetReplicationParameters(tmcCtx, tablet) + return td.tmClient().ResetReplicationParameters(tmcCtx, tablet) } // setReplicationSource calls the said RPC with the parameters provided -func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool, heartbeatInterval float64) error { +func (td *TabletDiscovery) setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool, heartbeatInterval float64) error { tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer tmcCancel() - return tmc.SetReplicationSource(tmcCtx, replica, primary.Alias, 0, "", true, semiSync, heartbeatInterval) + return td.tmClient().SetReplicationSource(tmcCtx, replica, primary.Alias, 0, "", true, semiSync, heartbeatInterval) } // shardPrimary finds the primary of the given keyspace-shard by reading the vtorc backend @@ -325,8 +394,7 @@ func shardPrimary(keyspace string, shard string) (primary *topodatapb.Tablet, er AND tablet_type = ? ORDER BY primary_timestamp DESC - LIMIT 1 -` + LIMIT 1` err = db.Db.QueryVTOrc(query, sqlutils.Args(keyspace, shard, topodatapb.TabletType_PRIMARY), func(m sqlutils.RowMap) error { if primary == nil { primary = &topodatapb.Tablet{} diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 9a468d1508a..550d3816234 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -17,6 +17,7 @@ package logic import ( + "context" "os" "os/signal" "sync" @@ -26,16 +27,19 @@ import ( "github.com/patrickmn/go-cache" "github.com/sjmudd/stopwatch" + "golang.org/x/sync/errgroup" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtorc/collection" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/discovery" "vitess.io/vitess/go/vt/vtorc/inst" ometrics "vitess.io/vitess/go/vt/vtorc/metrics" "vitess.io/vitess/go/vt/vtorc/util" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) const ( @@ -93,21 +97,47 @@ func acceptSighupSignal() { }() } +type VTOrc struct { + ksd *KeyspaceShardDiscovery + td *TabletDiscovery + tmc tmclient.TabletManagerClient + ts *topo.Server +} + +func NewVTOrc() (v *VTOrc, err error) { + ts := topo.Open() + tmc := inst.InitializeTMC() + + v = &VTOrc{ + ksd: NewKeyspaceShardDiscovery(ts), + tmc: tmc, + ts: ts, + } + + v.td, err = NewTabletDiscovery(v) + return v, err +} + // closeVTOrc runs all the operations required to cleanly shutdown VTOrc -func closeVTOrc() { +func (v *VTOrc) Close() { log.Infof("Starting VTOrc shutdown") atomic.StoreInt32(&hasReceivedSIGTERM, 1) discoveryMetrics.StopAutoExpiration() // Poke other go routines to stop cleanly here ... _ = inst.AuditOperation("shutdown", "", "Triggered via SIGTERM") // wait for the locks to be released - waitForLocksRelease() - ts.Close() + v.waitForLocksRelease() + if v.td != nil { + v.td.Close() + } + if v.ts != nil { + v.ts.Close() + } log.Infof("VTOrc closed") } // waitForLocksRelease is used to wait for release of locks -func waitForLocksRelease() { +func (v *VTOrc) waitForLocksRelease() { timeout := time.After(shutdownWaitTime) for { count := atomic.LoadInt32(&shardsLockCounter) @@ -127,7 +157,7 @@ func waitForLocksRelease() { // handleDiscoveryRequests iterates the discoveryQueue channel and calls upon // instance discovery per entry. -func handleDiscoveryRequests() { +func (v *VTOrc) handleDiscoveryRequests() { discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT") // create a pool of discovery workers for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { @@ -144,7 +174,7 @@ func handleDiscoveryRequests() { // DiscoverInstance will attempt to discover (poll) an instance (unless // it is already up-to-date) and will also ensure that its primary and // replicas (if any) are also checked. -func DiscoverInstance(tabletAlias string, forceDiscovery bool) { +func (v *VTOrc) DiscoverInstance(tabletAlias string, forceDiscovery bool) { if inst.InstanceIsForgotten(tabletAlias) { log.Infof("discoverInstance: skipping discovery of %+v because it is set to be forgotten", tabletAlias) return @@ -238,7 +268,7 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) { } // onHealthTick handles the actions to take to discover/poll instances -func onHealthTick() { +func (v *VTOrc) onHealthTick() { tabletAliases, err := inst.ReadOutdatedInstanceKeys() if err != nil { log.Error(err) @@ -269,7 +299,7 @@ func onHealthTick() { // periodically investigated and their status captured, and long since unseen instances are // purged and forgotten. // nolint SA1015: using time.Tick leaks the underlying ticker -func ContinuousDiscovery() { +func (v *VTOrc) ContinuousDiscovery() { log.Infof("continuous discovery: setting up") recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second) @@ -328,30 +358,27 @@ func ContinuousDiscovery() { go inst.SnapshotTopologies() }() case <-tabletTopoTick: - refreshAllInformation() + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + v.RefreshAllInformation(ctx) } } } // refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks. -func refreshAllInformation() { - // Create a wait group - var wg sync.WaitGroup +func (v *VTOrc) RefreshAllInformation(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) // Refresh all keyspace information. - wg.Add(1) - go func() { - defer wg.Done() - RefreshAllKeyspacesAndShards() - }() + eg.Go(func() error { + return RefreshAllKeyspacesAndShards(ctx) + }) // Refresh all tablets. - wg.Add(1) - go func() { - defer wg.Done() - refreshAllTablets() - }() + eg.Go(func() error { + return v.td.RefreshAllTablets(ctx) + }) // Wait for both the refreshes to complete - wg.Wait() + return eg.Wait() }