Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Nov 28, 2024
1 parent 0726ea6 commit e6d7f23
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 101 deletions.
51 changes: 29 additions & 22 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,27 @@ import (
"sync"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtorc/inst"
)

type KeyspaceShardDiscovery struct {
ts *topo.Server
}

func NewKeyspaceShardDiscovery(ts *topo.Server) *KeyspaceShardDiscovery {
return &KeyspaceShardDiscovery{ts}
}

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards() {
func (ksd *KeyspaceShardDiscovery) RefreshAllKeyspacesAndShards(ctx context.Context) {
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
getCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
// Get all the keyspaces
keyspaces, err = ts.GetKeyspaces(ctx)
keyspaces, err = ksd.ts.GetKeyspaces(getCtx)
if err != nil {
log.Error(err)
return
Expand All @@ -62,7 +69,7 @@ func RefreshAllKeyspacesAndShards() {
// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for idx, keyspace := range keyspaces {
Expand All @@ -75,42 +82,42 @@ func RefreshAllKeyspacesAndShards() {
wg.Add(2)
go func(keyspace string) {
defer wg.Done()
_ = refreshKeyspaceHelper(refreshCtx, keyspace)
_ = ksd.refreshKeyspaceHelper(refreshCtx, keyspace)
}(keyspace)
go func(keyspace string) {
defer wg.Done()
_ = refreshAllShards(refreshCtx, keyspace)
_ = ksd.refreshAllShards(refreshCtx, keyspace)
}(keyspace)
}
wg.Wait()
}

// RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard.
func RefreshKeyspaceAndShard(keyspaceName string, shardName string) error {
err := refreshKeyspace(keyspaceName)
func (ksd *KeyspaceShardDiscovery) RefreshKeyspaceAndShard(ctx context.Context, keyspaceName string, shardName string) error {
err := ksd.refreshKeyspace(ctx, keyspaceName)
if err != nil {
return err
}
return refreshShard(keyspaceName, shardName)
return ksd.refreshShard(ctx, keyspaceName, shardName)
}

// refreshKeyspace refreshes the keyspace's information for the given keyspace from the topo
func refreshKeyspace(keyspaceName string) error {
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
func (ksd *KeyspaceShardDiscovery) refreshKeyspace(ctx context.Context, keyspaceName string) error {
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
return refreshKeyspaceHelper(refreshCtx, keyspaceName)
return ksd.refreshKeyspaceHelper(refreshCtx, keyspaceName)
}

// refreshShard refreshes the shard's information for the given keyspace/shard from the topo
func refreshShard(keyspaceName, shardName string) error {
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
func (ksd *KeyspaceShardDiscovery) refreshShard(ctx context.Context, keyspaceName, shardName string) error {
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
return refreshSingleShardHelper(refreshCtx, keyspaceName, shardName)
return ksd.refreshSingleShardHelper(refreshCtx, keyspaceName, shardName)
}

// refreshKeyspaceHelper is a helper function which reloads the given keyspace's information
func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {
keyspaceInfo, err := ts.GetKeyspace(ctx, keyspaceName)
func (ksd *KeyspaceShardDiscovery) refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {
keyspaceInfo, err := ksd.ts.GetKeyspace(ctx, keyspaceName)
if err != nil {
log.Error(err)
return err
Expand All @@ -123,8 +130,8 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {
}

// refreshAllShards refreshes all the shard records in the given keyspace.
func refreshAllShards(ctx context.Context, keyspaceName string) error {
shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{
func (ksd *KeyspaceShardDiscovery) refreshAllShards(ctx context.Context, keyspaceName string) error {
shardInfos, err := ksd.ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{
// Fetch shard records concurrently to speed up discovery. A typical
// Vitess cluster will have 1-3 vtorc instances deployed, so there is
// little risk of a thundering herd.
Expand All @@ -145,8 +152,8 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
}

// refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard.
func refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error {
shardInfo, err := ts.GetShard(ctx, keyspaceName, shardName)
func (ksd *KeyspaceShardDiscovery) refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error {
shardInfo, err := ksd.ts.GetShard(ctx, keyspaceName, shardName)
if err != nil {
log.Error(err)
return err
Expand Down
Loading

0 comments on commit e6d7f23

Please sign in to comment.