diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 3917522a2f8..d34b4404df7 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -99,6 +99,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index dced769ca78..08b1c59f4cc 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -57,6 +57,7 @@ func init() { servenv.OnParseFor("vtcombo", registerFlags) servenv.OnParseFor("vtctld", registerFlags) servenv.OnParseFor("vtgate", registerFlags) + servenv.OnParseFor("vtorc", registerFlags) } // KeyspaceInfo is a meta struct that contains metadata to give the diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index ba3a02a68ca..eb4321f66d6 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -28,20 +28,20 @@ import ( "sync" "time" - "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" + "golang.org/x/sync/errgroup" + "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/event" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/events" "vitess.io/vitess/go/vt/topo/topoproto" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vterrors" ) const ( @@ -616,6 +616,71 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac return result, err } +// GetTabletsByShard returns the tablets in the given shard using all cells. +// It can return ErrPartialResult if it couldn't read all the cells, or all +// the individual tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) { + return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil) +} + +// GetTabletsByShardCell returns the tablets in the given shard. It can return +// ErrPartialResult if it couldn't read all the cells, or all the individual +// tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) { + span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell") + span.Annotate("keyspace", keyspace) + span.Annotate("shard", shard) + span.Annotate("num_cells", len(cells)) + defer span.Finish() + ctx = trace.NewContext(ctx, span) + var err error + + if len(cells) == 0 { + cells, err = ts.GetCellInfoNames(ctx) + if err != nil { + return nil, err + } + if len(cells) == 0 { // Nothing to do + return nil, nil + } + } + + // divide the concurrency limit by the number of cells. if there are more + // cells than the limit, default to concurrency of 1. + cellConcurrency := 1 + if len(cells) < DefaultConcurrency { + cellConcurrency = DefaultConcurrency / len(cells) + } + + mu := sync.Mutex{} + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(DefaultConcurrency) + + tablets := make([]*TabletInfo, 0, len(cells)*3) + for _, cell := range cells { + eg.Go(func() error { + t, err := ts.GetTabletsByCell(ctx, cell, &GetTabletsByCellOptions{ + Concurrency: cellConcurrency, + Keyspace: keyspace, + Shard: shard, + }) + if err != nil { + return vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)) + } + mu.Lock() + tablets = append(tablets, t...) + mu.Unlock() + return nil + }) + } + if err := eg.Wait(); err != nil { + log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err) + return tablets, NewError(PartialResult, shard) + } + + return tablets, nil +} + // GetTabletMapForShard returns the tablets for a shard. It can return // ErrPartialResult if it couldn't read all the cells, or all // the individual tablets, in which case the map is valid, but partial. diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 671a0f43905..5c9c9b52463 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -236,6 +236,10 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t type GetTabletsByCellOptions struct { // Concurrency controls the maximum number of concurrent calls to GetTablet. Concurrency int + // Keyspace is the optional keyspace tablets must match. + Keyspace string + // Shard is the optional shard tablets must match. + Shard string } // GetTabletsByCell returns all the tablets in the cell. @@ -263,13 +267,21 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G return nil, err } - tablets := make([]*TabletInfo, len(listResults)) + tablets := make([]*TabletInfo, 0) for n := range listResults { tablet := &topodatapb.Tablet{} if err := tablet.UnmarshalVT(listResults[n].Value); err != nil { return nil, err } - tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version} + if opt != nil && opt.Keyspace != "" { + if opt.Keyspace != tablet.Keyspace { + continue + } + if opt.Shard != "" && opt.Shard != tablet.Shard { + continue + } + } + tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version}) } return tablets, nil diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index 3a0153a11b5..5359eac09c3 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -34,43 +34,63 @@ import ( // GetTabletsByCell first tries to get all the tablets using List. // If the response is too large, we will get an error, and fall back to one tablet at a time. func TestServerGetTabletsByCell(t *testing.T) { + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + tests := []struct { - name string - tablets int - opt *topo.GetTabletsByCellOptions - listError error + name string + createShardTablets int + opt *topo.GetTabletsByCellOptions + listError error + keyspaceShards map[string]string }{ { - name: "negative concurrency", - tablets: 1, + name: "negative concurrency", + keyspaceShards: map[string]string{keyspace: shard}, + createShardTablets: 1, // Ensure this doesn't panic. opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, }, { - name: "single", - tablets: 1, + name: "single", + keyspaceShards: map[string]string{keyspace: shard}, + createShardTablets: 1, // Make sure the defaults apply as expected. opt: nil, }, { - name: "multiple", + name: "multiple", + keyspaceShards: map[string]string{keyspace: shard}, // should work with more than 1 tablet - tablets: 32, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + createShardTablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, }, { - name: "multiple with list error", + name: "multiple with list error", + keyspaceShards: map[string]string{keyspace: shard}, // should work with more than 1 tablet when List returns an error - tablets: 32, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, - listError: topo.NewError(topo.ResourceExhausted, ""), + createShardTablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + listError: topo.NewError(topo.ResourceExhausted, ""), + }, + { + name: "filtered by keyspace and shard", + keyspaceShards: map[string]string{ + keyspace: shard, + "filtered": "-", + }, + // should create 2 tablets in 2 different shards (4 total) + // but only a single shard is returned + createShardTablets: 2, + opt: &topo.GetTabletsByCellOptions{ + Concurrency: 1, + Keyspace: keyspace, + Shard: shard, + }, }, } - const cell = "zone1" - const keyspace = "keyspace" - const shard = "shard" - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -84,38 +104,49 @@ func TestServerGetTabletsByCell(t *testing.T) { // Create an ephemeral keyspace and generate shard records within // the keyspace to fetch later. - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablets := make([]*topo.TabletInfo, tt.tablets) - - for i := 0; i < tt.tablets; i++ { - tablet := &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: cell, - Uid: uint32(i), - }, - Hostname: "host1", - PortMap: map[string]int32{ - "vt": int32(i), - }, - Keyspace: keyspace, - Shard: shard, + for k, s := range tt.keyspaceShards { + require.NoError(t, ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, k, s)) + } + + tablets := make([]*topo.TabletInfo, tt.createShardTablets) + + var uid uint32 = 1 + for k, s := range tt.keyspaceShards { + for i := 0; i < tt.createShardTablets; i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uid, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(uid), + }, + Keyspace: k, + Shard: s, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + uid++ } - tInfo := &topo.TabletInfo{Tablet: tablet} - tablets[i] = tInfo - require.NoError(t, ts.CreateTablet(ctx, tablet)) } // Verify that we return a complete list of tablets and that each // tablet matches what we expect. out, err := ts.GetTabletsByCell(ctx, cell, tt.opt) require.NoError(t, err) - require.Len(t, out, tt.tablets) + require.Len(t, out, tt.createShardTablets) for i, tab := range tablets { require.Equal(t, tab.Tablet, tablets[i].Tablet) } + + for _, tablet := range out { + require.Equal(t, keyspace, tablet.Keyspace) + require.Equal(t, shard, tablet.Shard) + } }) } } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 25820bc5184..e9bbcee35cb 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -35,7 +35,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" @@ -156,7 +155,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell) + tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) return @@ -188,7 +187,7 @@ func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) { } func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { - tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard) + tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard) if err != nil { log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err) return @@ -198,7 +197,7 @@ func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore) } -func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { +func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { // Discover new tablets. latestInstances := make(map[string]bool) var wg sync.WaitGroup