Skip to content

Commit

Permalink
feat: improve how VTOrc handles first discovery
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Feb 21, 2024
1 parent 4ef43f6 commit 296005a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 32 deletions.
10 changes: 10 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -70,9 +71,18 @@ func OpenTabletDiscovery() <-chan time.Time {
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
}
// We refresh all information from the topo once before we start the ticks to do it on an timer.
populateAllInformation()

Check warning on line 75 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L75

Added line #L75 was not covered by tests
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// populateAllInformation initializes all the information for VTOrc to function.
func populateAllInformation() {
refreshAllInformation()
// We have completed one discovery cycle in the entirety of it. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets() {
refreshTabletsUsing(func(tabletAlias string) {
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
)

var (
Expand Down Expand Up @@ -342,3 +343,24 @@ func TestGetLockAction(t *testing.T) {
})
}
}

// TestProcessHealth tests that the health of the process reflects that we have run the first discovery once correctly.
func TestProcessHealth(t *testing.T) {
require.False(t, process.FirstDiscoveryCycleComplete.Load())
originalTs := ts
defer func() {
ts = originalTs
process.FirstDiscoveryCycleComplete.Store(false)
}()
// Verify in the beginning, we have the first DiscoveredOnce field false.
health, err := process.HealthTest()
require.NoError(t, err)
require.False(t, health.DiscoveredOnce)
ts = memorytopo.NewServer(context.Background(), cell1)
populateAllInformation()
require.True(t, process.FirstDiscoveryCycleComplete.Load())
// Verify after we populate all information, we have the first DiscoveredOnce field true.
health, err = process.HealthTest()
require.NoError(t, err)
require.True(t, health.DiscoveredOnce)
}
55 changes: 24 additions & 31 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,6 @@ func onHealthTick() {
// nolint SA1015: using time.Tick leaks the underlying ticker
func ContinuousDiscovery() {
log.Infof("continuous discovery: setting up")
continuousDiscoveryStartTime := time.Now()
checkAndRecoverWaitPeriod := 3 * instancePollSecondsDuration()
recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second)

go handleDiscoveryRequests()
Expand All @@ -352,10 +350,6 @@ func ContinuousDiscovery() {
snapshotTopologiesTick = time.Tick(time.Duration(config.Config.SnapshotTopologiesIntervalHours) * time.Hour)
}

runCheckAndRecoverOperationsTimeRipe := func() bool {
return time.Since(continuousDiscoveryStartTime) >= checkAndRecoverWaitPeriod
}

go func() {
_ = ometrics.InitMetrics()
}()
Expand Down Expand Up @@ -401,11 +395,7 @@ func ContinuousDiscovery() {
} else {
return
}
if runCheckAndRecoverOperationsTimeRipe() {
CheckAndRecover()
} else {
log.Infof("Waiting for %+v seconds to pass before running failure detection/recovery", checkAndRecoverWaitPeriod.Seconds())
}
CheckAndRecover()

Check warning on line 398 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L398

Added line #L398 was not covered by tests
}()
}
}()
Expand All @@ -416,27 +406,30 @@ func ContinuousDiscovery() {
}
}()
case <-tabletTopoTick:
// Create a wait group
var wg sync.WaitGroup
refreshAllInformation()

Check warning on line 409 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L409

Added line #L409 was not covered by tests
}
}
}

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()
// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
func refreshAllInformation() {
// Create a wait group
var wg sync.WaitGroup

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()
// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()

// Wait for both the refreshes to complete
wg.Wait()
// We have completed one discovery cycle in the entirety of it. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}
}
// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()

// Wait for both the refreshes to complete
wg.Wait()
}
4 changes: 3 additions & 1 deletion go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) {
func HealthTest() (health *HealthStatus, err error) {
cacheKey := util.ProcessToken.Hash
if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found {
return healthStatus.(*HealthStatus), nil
health = healthStatus.(*HealthStatus)
health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load()
return
}

health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash}
Expand Down

0 comments on commit 296005a

Please sign in to comment.