From 296005a78ebc4707593ef9105fe9749ae806e529 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 21 Feb 2024 11:52:45 +0530 Subject: [PATCH] feat: improve how VTOrc handles first discovery Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/tablet_discovery.go | 10 ++++ go/vt/vtorc/logic/tablet_discovery_test.go | 22 +++++++++ go/vt/vtorc/logic/vtorc.go | 55 ++++++++++------------ go/vt/vtorc/process/health.go | 4 +- 4 files changed, 59 insertions(+), 32 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 47f52f08c6c..6ac79ceaf1c 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" + "vitess.io/vitess/go/vt/vtorc/process" "vitess.io/vitess/go/vt/vttablet/tmclient" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -70,9 +71,18 @@ func OpenTabletDiscovery() <-chan time.Time { if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { log.Error(err) } + // We refresh all information from the topo once before we start the ticks to do it on an timer. + populateAllInformation() return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker } +// populateAllInformation initializes all the information for VTOrc to function. +func populateAllInformation() { + refreshAllInformation() + // We have completed one discovery cycle in the entirety of it. We should update the process health. + process.FirstDiscoveryCycleComplete.Store(true) +} + // refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while func refreshAllTablets() { refreshTabletsUsing(func(tabletAlias string) { diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 0e8ac72fabf..f79cecf9ff5 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -34,6 +34,7 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" + "vitess.io/vitess/go/vt/vtorc/process" ) var ( @@ -342,3 +343,24 @@ func TestGetLockAction(t *testing.T) { }) } } + +// TestProcessHealth tests that the health of the process reflects that we have run the first discovery once correctly. +func TestProcessHealth(t *testing.T) { + require.False(t, process.FirstDiscoveryCycleComplete.Load()) + originalTs := ts + defer func() { + ts = originalTs + process.FirstDiscoveryCycleComplete.Store(false) + }() + // Verify in the beginning, we have the first DiscoveredOnce field false. + health, err := process.HealthTest() + require.NoError(t, err) + require.False(t, health.DiscoveredOnce) + ts = memorytopo.NewServer(context.Background(), cell1) + populateAllInformation() + require.True(t, process.FirstDiscoveryCycleComplete.Load()) + // Verify after we populate all information, we have the first DiscoveredOnce field true. + health, err = process.HealthTest() + require.NoError(t, err) + require.True(t, health.DiscoveredOnce) +} diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 1a02c96a158..66c5590831b 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -336,8 +336,6 @@ func onHealthTick() { // nolint SA1015: using time.Tick leaks the underlying ticker func ContinuousDiscovery() { log.Infof("continuous discovery: setting up") - continuousDiscoveryStartTime := time.Now() - checkAndRecoverWaitPeriod := 3 * instancePollSecondsDuration() recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second) go handleDiscoveryRequests() @@ -352,10 +350,6 @@ func ContinuousDiscovery() { snapshotTopologiesTick = time.Tick(time.Duration(config.Config.SnapshotTopologiesIntervalHours) * time.Hour) } - runCheckAndRecoverOperationsTimeRipe := func() bool { - return time.Since(continuousDiscoveryStartTime) >= checkAndRecoverWaitPeriod - } - go func() { _ = ometrics.InitMetrics() }() @@ -401,11 +395,7 @@ func ContinuousDiscovery() { } else { return } - if runCheckAndRecoverOperationsTimeRipe() { - CheckAndRecover() - } else { - log.Infof("Waiting for %+v seconds to pass before running failure detection/recovery", checkAndRecoverWaitPeriod.Seconds()) - } + CheckAndRecover() }() } }() @@ -416,27 +406,30 @@ func ContinuousDiscovery() { } }() case <-tabletTopoTick: - // Create a wait group - var wg sync.WaitGroup + refreshAllInformation() + } + } +} - // Refresh all keyspace information. - wg.Add(1) - go func() { - defer wg.Done() - RefreshAllKeyspacesAndShards() - }() +// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks. +func refreshAllInformation() { + // Create a wait group + var wg sync.WaitGroup - // Refresh all tablets. - wg.Add(1) - go func() { - defer wg.Done() - refreshAllTablets() - }() + // Refresh all keyspace information. + wg.Add(1) + go func() { + defer wg.Done() + RefreshAllKeyspacesAndShards() + }() - // Wait for both the refreshes to complete - wg.Wait() - // We have completed one discovery cycle in the entirety of it. We should update the process health. - process.FirstDiscoveryCycleComplete.Store(true) - } - } + // Refresh all tablets. + wg.Add(1) + go func() { + defer wg.Done() + refreshAllTablets() + }() + + // Wait for both the refreshes to complete + wg.Wait() } diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 22db89e1d56..a782b2edf14 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -108,7 +108,9 @@ func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { func HealthTest() (health *HealthStatus, err error) { cacheKey := util.ProcessToken.Hash if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found { - return healthStatus.(*HealthStatus), nil + health = healthStatus.(*HealthStatus) + health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() + return } health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash}