diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index c45092b811a..e36c35924b1 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -107,6 +107,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index dced769ca78..08b1c59f4cc 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -57,6 +57,7 @@ func init() { servenv.OnParseFor("vtcombo", registerFlags) servenv.OnParseFor("vtctld", registerFlags) servenv.OnParseFor("vtgate", registerFlags) + servenv.OnParseFor("vtorc", registerFlags) } // KeyspaceInfo is a meta struct that contains metadata to give the diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 7343db2c0ab..7a264356b95 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -27,20 +27,20 @@ import ( "sync" "time" - "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" + "golang.org/x/sync/errgroup" + "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/event" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/events" "vitess.io/vitess/go/vt/topo/topoproto" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vterrors" ) const ( @@ -644,6 +644,71 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac return result, err } +// GetTabletsByShard returns the tablets in the given shard using all cells. +// It can return ErrPartialResult if it couldn't read all the cells, or all +// the individual tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) { + return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil) +} + +// GetTabletsByShardCell returns the tablets in the given shard. It can return +// ErrPartialResult if it couldn't read all the cells, or all the individual +// tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) { + span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell") + span.Annotate("keyspace", keyspace) + span.Annotate("shard", shard) + span.Annotate("num_cells", len(cells)) + defer span.Finish() + ctx = trace.NewContext(ctx, span) + var err error + + if len(cells) == 0 { + cells, err = ts.GetCellInfoNames(ctx) + if err != nil { + return nil, err + } + if len(cells) == 0 { // Nothing to do + return nil, nil + } + } + + // divide the concurrency limit by the number of cells. if there are more + // cells than the limit, default to concurrency of 1. + cellConcurrency := 1 + if len(cells) < DefaultConcurrency { + cellConcurrency = DefaultConcurrency / len(cells) + } + + mu := sync.Mutex{} + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(DefaultConcurrency) + + tablets := make([]*TabletInfo, 0, len(cells)*3) + for _, cell := range cells { + eg.Go(func() error { + t, err := ts.GetTabletsByCell(ctx, cell, &GetTabletsByCellOptions{ + Concurrency: cellConcurrency, + Keyspace: keyspace, + Shard: shard, + }) + if err != nil { + return vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)) + } + mu.Lock() + tablets = append(tablets, t...) + mu.Unlock() + return nil + }) + } + if err := eg.Wait(); err != nil { + log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err) + return tablets, NewError(PartialResult, shard) + } + + return tablets, nil +} + // GetTabletMapForShard returns the tablets for a shard. It can return // ErrPartialResult if it couldn't read all the cells, or all // the individual tablets, in which case the map is valid, but partial. diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index a9e4c5962fa..62723043629 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -289,6 +289,10 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t type GetTabletsByCellOptions struct { // Concurrency controls the maximum number of concurrent calls to GetTablet. Concurrency int + // Keyspace is the optional keyspace tablets must match. + Keyspace string + // Shard is the optional shard tablets must match. + Shard string } // GetTabletsByCell returns all the tablets in the cell. @@ -316,13 +320,21 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G return nil, err } - tablets := make([]*TabletInfo, len(listResults)) + tablets := make([]*TabletInfo, 0) for n := range listResults { tablet := &topodatapb.Tablet{} if err := tablet.UnmarshalVT(listResults[n].Value); err != nil { return nil, err } - tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version} + if opt != nil && opt.Keyspace != "" { + if opt.Keyspace != tablet.Keyspace { + continue + } + if opt.Shard != "" && opt.Shard != tablet.Shard { + continue + } + } + tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version}) } return tablets, nil diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index 3a0153a11b5..5359eac09c3 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -34,43 +34,63 @@ import ( // GetTabletsByCell first tries to get all the tablets using List. // If the response is too large, we will get an error, and fall back to one tablet at a time. func TestServerGetTabletsByCell(t *testing.T) { + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + tests := []struct { - name string - tablets int - opt *topo.GetTabletsByCellOptions - listError error + name string + createShardTablets int + opt *topo.GetTabletsByCellOptions + listError error + keyspaceShards map[string]string }{ { - name: "negative concurrency", - tablets: 1, + name: "negative concurrency", + keyspaceShards: map[string]string{keyspace: shard}, + createShardTablets: 1, // Ensure this doesn't panic. opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, }, { - name: "single", - tablets: 1, + name: "single", + keyspaceShards: map[string]string{keyspace: shard}, + createShardTablets: 1, // Make sure the defaults apply as expected. opt: nil, }, { - name: "multiple", + name: "multiple", + keyspaceShards: map[string]string{keyspace: shard}, // should work with more than 1 tablet - tablets: 32, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + createShardTablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, }, { - name: "multiple with list error", + name: "multiple with list error", + keyspaceShards: map[string]string{keyspace: shard}, // should work with more than 1 tablet when List returns an error - tablets: 32, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, - listError: topo.NewError(topo.ResourceExhausted, ""), + createShardTablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + listError: topo.NewError(topo.ResourceExhausted, ""), + }, + { + name: "filtered by keyspace and shard", + keyspaceShards: map[string]string{ + keyspace: shard, + "filtered": "-", + }, + // should create 2 tablets in 2 different shards (4 total) + // but only a single shard is returned + createShardTablets: 2, + opt: &topo.GetTabletsByCellOptions{ + Concurrency: 1, + Keyspace: keyspace, + Shard: shard, + }, }, } - const cell = "zone1" - const keyspace = "keyspace" - const shard = "shard" - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -84,38 +104,49 @@ func TestServerGetTabletsByCell(t *testing.T) { // Create an ephemeral keyspace and generate shard records within // the keyspace to fetch later. - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablets := make([]*topo.TabletInfo, tt.tablets) - - for i := 0; i < tt.tablets; i++ { - tablet := &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: cell, - Uid: uint32(i), - }, - Hostname: "host1", - PortMap: map[string]int32{ - "vt": int32(i), - }, - Keyspace: keyspace, - Shard: shard, + for k, s := range tt.keyspaceShards { + require.NoError(t, ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, k, s)) + } + + tablets := make([]*topo.TabletInfo, tt.createShardTablets) + + var uid uint32 = 1 + for k, s := range tt.keyspaceShards { + for i := 0; i < tt.createShardTablets; i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uid, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(uid), + }, + Keyspace: k, + Shard: s, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + uid++ } - tInfo := &topo.TabletInfo{Tablet: tablet} - tablets[i] = tInfo - require.NoError(t, ts.CreateTablet(ctx, tablet)) } // Verify that we return a complete list of tablets and that each // tablet matches what we expect. out, err := ts.GetTabletsByCell(ctx, cell, tt.opt) require.NoError(t, err) - require.Len(t, out, tt.tablets) + require.Len(t, out, tt.createShardTablets) for i, tab := range tablets { require.Equal(t, tab.Tablet, tablets[i].Tablet) } + + for _, tablet := range out { + require.Equal(t, keyspace, tablet.Keyspace) + require.Equal(t, shard, tablet.Shard) + } }) } } diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 6836095f54a..a1fa1c372f9 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -881,7 +881,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, var args []any for _, instance := range instances { // number of columns minus 2 as last_checked and last_attempted_check - // updated with NOW() + // updated with datetime('now') args = append(args, instance.InstanceAlias) args = append(args, instance.Hostname) args = append(args, instance.Port) @@ -1169,7 +1169,7 @@ func RecordStaleInstanceBinlogCoordinates(tabletAlias string, binlogCoordinates alias, binary_log_file, binary_log_pos, first_seen ) values ( - ?, ?, ?, NOW() + ?, ?, ?, datetime('now') )`, args...) if err != nil { diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index cd112481edc..73cd61676ca 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -37,7 +37,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/reparentutil" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" @@ -203,7 +202,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell) + tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) return @@ -235,7 +234,7 @@ func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) { } func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { - tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard) + tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard) if err != nil { log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err) return @@ -245,7 +244,7 @@ func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore) } -func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { +func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { // Discover new tablets. latestInstances := make(map[string]bool) var wg sync.WaitGroup diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 4b472d435f4..cc3d32ae8ec 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -95,10 +95,10 @@ func ClearActiveFailureDetections() error { _, err := db.ExecVTOrc(` update topology_failure_detection set in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() + end_active_period_unixtime = strftime('%s', 'now') where in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? MINUTE + AND start_active_period < datetime('now', printf('-%d MINUTE)) `, config.FailureDetectionPeriodBlockMinutes, ) @@ -225,10 +225,10 @@ func ClearActiveRecoveries() error { _, err := db.ExecVTOrc(` update topology_recovery set in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() + end_active_period_unixtime = strftime('%s', 'now') where in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? SECOND + AND start_active_period < datetime('now', printf('-%d SECOND', ?)) `, config.Config.RecoveryPeriodBlockSeconds, ) @@ -243,7 +243,7 @@ func ClearActiveRecoveries() error { func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*TopologyRecovery) error { for _, recovery := range blockingRecoveries { _, err := db.ExecVTOrc(` - insert + insert into blocked_topology_recovery ( alias, keyspace, @@ -256,15 +256,15 @@ func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blocking ?, ?, ?, - NOW(), + datetime('now'), ? ) - on duplicate key update - keyspace=values(keyspace), - shard=values(shard), - analysis=values(analysis), - last_blocked_timestamp=values(last_blocked_timestamp), - blocking_recovery_id=values(blocking_recovery_id) + on conflict(alias) do update set + keyspace=keyspace, + shard=shard, + analysis=analysis, + last_blocked_timestamp=last_blocked_timestamp, + blocking_recovery_id=blocking_recovery_id `, analysisEntry.AnalyzedInstanceAlias, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, @@ -325,7 +325,7 @@ func ExpireBlockedRecoveries() error { delete from blocked_topology_recovery where - last_blocked_timestamp < NOW() - interval ? second + last_blocked_timestamp < datetime('now', printf('-%d second', ?)) `, config.Config.RecoveryPollSeconds*2, ) if err != nil { @@ -339,16 +339,16 @@ func acknowledgeRecoveries(owner string, comment string, markEndRecovery bool, w additionalSet := `` if markEndRecovery { additionalSet = ` - end_recovery=IFNULL(end_recovery, NOW()), + end_recovery=IFNULL(end_recovery, datetime('now')), ` } query := fmt.Sprintf(` update topology_recovery set in_active_period = 0, - end_active_period_unixtime = case when end_active_period_unixtime = 0 then UNIX_TIMESTAMP() else end_active_period_unixtime end, + end_active_period_unixtime = case when end_active_period_unixtime = 0 then strftime('%%s', 'now') else end_active_period_unixtime end, %s acknowledged = 1, - acknowledged_at = NOW(), + acknowledged_at = datetime('now'), acknowledged_by = ?, acknowledge_comment = ? where diff --git a/go/vt/vtorc/process/health_dao.go b/go/vt/vtorc/process/health_dao.go index 59ea557223d..dcfed83457a 100644 --- a/go/vt/vtorc/process/health_dao.go +++ b/go/vt/vtorc/process/health_dao.go @@ -40,7 +40,7 @@ func WriteRegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { insert ignore into node_health_history (hostname, token, first_seen_active, extra_info, command, app_version) values - (?, ?, NOW(), ?, ?, ?) + (?, ?, datetime('now'), ?, ?, ?) `, nodeHealth.Hostname, nodeHealth.Token, nodeHealth.ExtraInfo, nodeHealth.Command, nodeHealth.AppVersion,