Skip to content

Commit

Permalink
Improve VTOrc startup flow (#15315)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Feb 22, 2024
1 parent 0adb706 commit d8ac5a8
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 40 deletions.
11 changes: 4 additions & 7 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,18 @@ func TestAPIEndpoints(t *testing.T) {
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
RecoveryPeriodBlockSeconds: 5,
// The default topo refresh time is 3 seconds. We are intentionally making it slower for the test, so that we have time to verify
// the /debug/health output before and after the first refresh runs.
TopologyRefreshSeconds: 10,
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
// Call API with retry to ensure VTOrc is up
status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool {
return code == 0
return code != 200
})
// When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false.
assert.Equal(t, 500, status)
// Verify when VTOrc is healthy, it has also run the first discovery.
assert.Equal(t, 200, status)
assert.Contains(t, resp, `"Healthy": true,`)
assert.Contains(t, resp, `"DiscoveredOnce": false`)
assert.Contains(t, resp, `"DiscoveredOnce": true`)

// find primary from topo
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -64,16 +65,24 @@ func RegisterFlags(fs *pflag.FlagSet) {
// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
// TODO(sougou): If there's a shutdown signal, we have to close the topo.
ts = topo.Open()
tmc = tmclient.NewTabletManagerClient()
// Clear existing cache and perform a new refresh.
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
}
// We refresh all information from the topo once before we start the ticks to do it on a timer.
populateAllInformation()
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// populateAllInformation initializes all the information for VTOrc to function.
func populateAllInformation() {
refreshAllInformation()
// We have completed one full discovery cycle. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets() {
refreshTabletsUsing(func(tabletAlias string) {
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
)

var (
Expand Down Expand Up @@ -342,3 +343,24 @@ func TestGetLockAction(t *testing.T) {
})
}
}

// TestProcessHealth tests that the health of the process reflects that we have run the first discovery once correctly.
func TestProcessHealth(t *testing.T) {
require.False(t, process.FirstDiscoveryCycleComplete.Load())
originalTs := ts
defer func() {
ts = originalTs
process.FirstDiscoveryCycleComplete.Store(false)
}()
// Verify in the beginning, we have the first DiscoveredOnce field false.
health, err := process.HealthTest()
require.NoError(t, err)
require.False(t, health.DiscoveredOnce)
ts = memorytopo.NewServer(context.Background(), cell1)
populateAllInformation()
require.True(t, process.FirstDiscoveryCycleComplete.Load())
// Verify after we populate all information, we have the first DiscoveredOnce field true.
health, err = process.HealthTest()
require.NoError(t, err)
require.True(t, health.DiscoveredOnce)
}
56 changes: 25 additions & 31 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func closeVTOrc() {
_ = inst.AuditOperation("shutdown", "", "Triggered via SIGTERM")
// wait for the locks to be released
waitForLocksRelease()
ts.Close()
log.Infof("VTOrc closed")
}

Expand Down Expand Up @@ -335,8 +336,6 @@ func onHealthTick() {
// nolint SA1015: using time.Tick leaks the underlying ticker
func ContinuousDiscovery() {
log.Infof("continuous discovery: setting up")
continuousDiscoveryStartTime := time.Now()
checkAndRecoverWaitPeriod := 3 * instancePollSecondsDuration()
recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second)

go handleDiscoveryRequests()
Expand All @@ -351,10 +350,6 @@ func ContinuousDiscovery() {
snapshotTopologiesTick = time.Tick(time.Duration(config.Config.SnapshotTopologiesIntervalHours) * time.Hour)
}

runCheckAndRecoverOperationsTimeRipe := func() bool {
return time.Since(continuousDiscoveryStartTime) >= checkAndRecoverWaitPeriod
}

go func() {
_ = ometrics.InitMetrics()
}()
Expand Down Expand Up @@ -400,11 +395,7 @@ func ContinuousDiscovery() {
} else {
return
}
if runCheckAndRecoverOperationsTimeRipe() {
CheckAndRecover()
} else {
log.Infof("Waiting for %+v seconds to pass before running failure detection/recovery", checkAndRecoverWaitPeriod.Seconds())
}
CheckAndRecover()
}()
}
}()
Expand All @@ -415,27 +406,30 @@ func ContinuousDiscovery() {
}
}()
case <-tabletTopoTick:
// Create a wait group
var wg sync.WaitGroup
refreshAllInformation()
}
}
}

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()
// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
func refreshAllInformation() {
// Create a wait group
var wg sync.WaitGroup

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()
// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()

// Wait for both the refreshes to complete
wg.Wait()
// We have completed one discovery cycle in the entirety of it. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}
}
// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()

// Wait for both the refreshes to complete
wg.Wait()
}
4 changes: 3 additions & 1 deletion go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) {
func HealthTest() (health *HealthStatus, err error) {
cacheKey := util.ProcessToken.Hash
if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found {
return healthStatus.(*HealthStatus), nil
health = healthStatus.(*HealthStatus)
health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load()
return
}

health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash}
Expand Down

0 comments on commit d8ac5a8

Please sign in to comment.