Skip to content

Commit

Permalink
Improve efficiency of vtorc topo calls
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Oct 31, 2024
1 parent 84067c6 commit df55e9a
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 71 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vtorc", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
Expand Down
77 changes: 71 additions & 6 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import (
"sync"
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/topo/topoproto"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand Down Expand Up @@ -644,6 +644,71 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
return result, err
}

// GetTabletsByShard returns the tablets in the given shard using all cells.
// It can return ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) {
return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil)
}

// GetTabletsByShardCell returns the tablets in the given shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all the individual
// tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) {
span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell")
span.Annotate("keyspace", keyspace)
span.Annotate("shard", shard)
span.Annotate("num_cells", len(cells))
defer span.Finish()
ctx = trace.NewContext(ctx, span)
var err error

if len(cells) == 0 {
cells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(cells) == 0 { // Nothing to do
return nil, nil
}
}

// divide the concurrency limit by the number of cells. if there are more
// cells than the limit, default to concurrency of 1.
cellConcurrency := 1
if len(cells) < DefaultConcurrency {
cellConcurrency = DefaultConcurrency / len(cells)
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(DefaultConcurrency)

tablets := make([]*TabletInfo, 0, len(cells)*3)
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, &GetTabletsByCellOptions{
Concurrency: cellConcurrency,
Keyspace: keyspace,
Shard: shard,
})
if err != nil {
return vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell))
}
mu.Lock()
tablets = append(tablets, t...)
mu.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err)
return tablets, NewError(PartialResult, shard)
}

return tablets, nil
}

// GetTabletMapForShard returns the tablets for a shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the map is valid, but partial.
Expand Down
16 changes: 14 additions & 2 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t
type GetTabletsByCellOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetTablet.
Concurrency int
// Keyspace is the optional keyspace tablets must match.
Keyspace string
// Shard is the optional shard tablets must match.
Shard string
}

// GetTabletsByCell returns all the tablets in the cell.
Expand Down Expand Up @@ -316,13 +320,21 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G
return nil, err
}

tablets := make([]*TabletInfo, len(listResults))
tablets := make([]*TabletInfo, 0)
for n := range listResults {
tablet := &topodatapb.Tablet{}
if err := tablet.UnmarshalVT(listResults[n].Value); err != nil {
return nil, err
}
tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version}
if opt != nil && opt.Keyspace != "" {
if opt.Keyspace != tablet.Keyspace {
continue
}
if opt.Shard != "" && opt.Shard != tablet.Shard {
continue
}
}
tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version})
}

return tablets, nil
Expand Down
111 changes: 71 additions & 40 deletions go/vt/topo/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,63 @@ import (
// GetTabletsByCell first tries to get all the tablets using List.
// If the response is too large, we will get an error, and fall back to one tablet at a time.
func TestServerGetTabletsByCell(t *testing.T) {
const cell = "zone1"
const keyspace = "keyspace"
const shard = "shard"

tests := []struct {
name string
tablets int
opt *topo.GetTabletsByCellOptions
listError error
name string
createShardTablets int
opt *topo.GetTabletsByCellOptions
listError error
keyspaceShards map[string]string
}{
{
name: "negative concurrency",
tablets: 1,
name: "negative concurrency",
keyspaceShards: map[string]string{keyspace: shard},
createShardTablets: 1,
// Ensure this doesn't panic.
opt: &topo.GetTabletsByCellOptions{Concurrency: -1},
},
{
name: "single",
tablets: 1,
name: "single",
keyspaceShards: map[string]string{keyspace: shard},
createShardTablets: 1,
// Make sure the defaults apply as expected.
opt: nil,
},
{
name: "multiple",
name: "multiple",
keyspaceShards: map[string]string{keyspace: shard},
// should work with more than 1 tablet
tablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
createShardTablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
},
{
name: "multiple with list error",
name: "multiple with list error",
keyspaceShards: map[string]string{keyspace: shard},
// should work with more than 1 tablet when List returns an error
tablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
listError: topo.NewError(topo.ResourceExhausted, ""),
createShardTablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
listError: topo.NewError(topo.ResourceExhausted, ""),
},
{
name: "filtered by keyspace and shard",
keyspaceShards: map[string]string{
keyspace: shard,
"filtered": "-",
},
// should create 2 tablets in 2 different shards (4 total)
// but only a single shard is returned
createShardTablets: 2,
opt: &topo.GetTabletsByCellOptions{
Concurrency: 1,
Keyspace: keyspace,
Shard: shard,
},
},
}

const cell = "zone1"
const keyspace = "keyspace"
const shard = "shard"

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -84,38 +104,49 @@ func TestServerGetTabletsByCell(t *testing.T) {

// Create an ephemeral keyspace and generate shard records within
// the keyspace to fetch later.
require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}))
require.NoError(t, ts.CreateShard(ctx, keyspace, shard))

tablets := make([]*topo.TabletInfo, tt.tablets)

for i := 0; i < tt.tablets; i++ {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Uid: uint32(i),
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": int32(i),
},
Keyspace: keyspace,
Shard: shard,
for k, s := range tt.keyspaceShards {
require.NoError(t, ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{}))
require.NoError(t, ts.CreateShard(ctx, k, s))
}

tablets := make([]*topo.TabletInfo, tt.createShardTablets)

var uid uint32 = 1
for k, s := range tt.keyspaceShards {
for i := 0; i < tt.createShardTablets; i++ {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Uid: uid,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": int32(uid),
},
Keyspace: k,
Shard: s,
}
tInfo := &topo.TabletInfo{Tablet: tablet}
tablets[i] = tInfo
require.NoError(t, ts.CreateTablet(ctx, tablet))
uid++
}
tInfo := &topo.TabletInfo{Tablet: tablet}
tablets[i] = tInfo
require.NoError(t, ts.CreateTablet(ctx, tablet))
}

// Verify that we return a complete list of tablets and that each
// tablet matches what we expect.
out, err := ts.GetTabletsByCell(ctx, cell, tt.opt)
require.NoError(t, err)
require.Len(t, out, tt.tablets)
require.Len(t, out, tt.createShardTablets)

for i, tab := range tablets {
require.Equal(t, tab.Tablet, tablets[i].Tablet)
}

for _, tablet := range out {
require.Equal(t, keyspace, tablet.Keyspace)
require.Equal(t, shard, tablet.Shard)
}
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
var args []any
for _, instance := range instances {
// number of columns minus 2 as last_checked and last_attempted_check
// updated with NOW()
// updated with datetime('now')
args = append(args, instance.InstanceAlias)
args = append(args, instance.Hostname)
args = append(args, instance.Port)
Expand Down Expand Up @@ -1169,7 +1169,7 @@ func RecordStaleInstanceBinlogCoordinates(tabletAlias string, binlogCoordinates
alias, binary_log_file, binary_log_pos, first_seen
)
values (
?, ?, ?, NOW()
?, ?, ?, datetime('now')
)`,
args...)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/reparentutil"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
Expand Down Expand Up @@ -203,7 +202,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}

func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {
tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell)
tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency})
if err != nil {
log.Errorf("Error fetching topo info for cell %v: %v", cell, err)
return
Expand Down Expand Up @@ -235,7 +234,7 @@ func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) {
}

func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) {
tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard)
tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard)
if err != nil {
log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err)
return
Expand All @@ -245,7 +244,7 @@ func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string,
refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore)
}

func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) {
func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) {
// Discover new tablets.
latestInstances := make(map[string]bool)
var wg sync.WaitGroup
Expand Down
Loading

0 comments on commit df55e9a

Please sign in to comment.