Skip to content

Commit

Permalink
feat: remove reading emergently topology instances
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Mar 27, 2024
1 parent e9c8d52 commit 9bfc2a2
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 122 deletions.
113 changes: 0 additions & 113 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"math/rand/v2"
"time"

"github.com/patrickmn/go-cache"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -160,20 +158,12 @@ func NewTopologyRecoveryStep(uid string, message string) *TopologyRecoveryStep {
}
}

var emergencyReadTopologyInstanceMap *cache.Cache
var emergencyRestartReplicaTopologyInstanceMap *cache.Cache
var emergencyOperationGracefulPeriodMap *cache.Cache

func init() {
go initializeTopologyRecoveryPostConfiguration()
}

func initializeTopologyRecoveryPostConfiguration() {
config.WaitForConfigurationToBeLoaded()

emergencyReadTopologyInstanceMap = cache.New(time.Second, time.Millisecond*250)
emergencyRestartReplicaTopologyInstanceMap = cache.New(time.Second*30, time.Second)
emergencyOperationGracefulPeriodMap = cache.New(time.Second*5, time.Millisecond*500)
}

// AuditTopologyRecovery audits a single step in a topology recovery process.
Expand Down Expand Up @@ -310,80 +300,6 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl
return false, nil, nil
}

// Force a re-read of a topology instance; this is done because we need to substantiate a suspicion
// that we may have a failover scenario. we want to speed up reading the complete picture.
func emergentlyReadTopologyInstance(tabletAlias string, analysisCode inst.AnalysisCode) (instance *inst.Instance) {
if existsInCacheError := emergencyReadTopologyInstanceMap.Add(tabletAlias, true, cache.DefaultExpiration); existsInCacheError != nil {
// Just recently attempted
return nil
}
instance, _ = inst.ReadTopologyInstance(tabletAlias)
_ = inst.AuditOperation("emergently-read-topology-instance", tabletAlias, string(analysisCode))
return instance
}

// Force reading of replicas of given instance. This is because we suspect the instance is dead, and want to speed up
// detection of replication failure from its replicas.
func emergentlyReadTopologyInstanceReplicas(primaryHost string, primaryPort int, analysisCode inst.AnalysisCode) {
replicas, err := inst.ReadReplicaInstancesIncludingBinlogServerSubReplicas(primaryHost, primaryPort)
if err != nil {
return
}
for _, replica := range replicas {
go emergentlyReadTopologyInstance(replica.InstanceAlias, analysisCode)
}
}

// emergentlyRestartReplicationOnTopologyInstance forces a RestartReplication on a given instance.
func emergentlyRestartReplicationOnTopologyInstance(tabletAlias string, analysisCode inst.AnalysisCode) {
if existsInCacheError := emergencyRestartReplicaTopologyInstanceMap.Add(tabletAlias, true, cache.DefaultExpiration); existsInCacheError != nil {
// Just recently attempted on this specific replica
return
}
go inst.ExecuteOnTopology(func() {
_ = restartReplication(tabletAlias)
_ = inst.AuditOperation("emergently-restart-replication-topology-instance", tabletAlias, string(analysisCode))
})
}

func beginEmergencyOperationGracefulPeriod(tabletAlias string) {
emergencyOperationGracefulPeriodMap.Set(tabletAlias, true, cache.DefaultExpiration)
}

func isInEmergencyOperationGracefulPeriod(tabletAlias string) bool {
_, found := emergencyOperationGracefulPeriodMap.Get(tabletAlias)
return found
}

// emergentlyRestartReplicationOnTopologyInstanceReplicas forces a stop slave + start slave on
// replicas of a given instance, in an attempt to cause them to re-evaluate their replication state.
// This can be useful in scenarios where the primary has Too Many Connections, but long-time connected
// replicas are not seeing this; when they stop+start replication, they need to re-authenticate and
// that's where we hope they realize the primary is bad.
func emergentlyRestartReplicationOnTopologyInstanceReplicas(primaryHost string, primaryPort int, tabletAlias string, analysisCode inst.AnalysisCode) {
if existsInCacheError := emergencyRestartReplicaTopologyInstanceMap.Add(tabletAlias, true, cache.DefaultExpiration); existsInCacheError != nil {
// While each replica's RestartReplication() is throttled on its own, it's also wasteful to
// iterate all replicas all the time. This is the reason why we do grand-throttle check.
return
}
beginEmergencyOperationGracefulPeriod(tabletAlias)

replicas, err := inst.ReadReplicaInstancesIncludingBinlogServerSubReplicas(primaryHost, primaryPort)
if err != nil {
return
}
for _, replica := range replicas {
go emergentlyRestartReplicationOnTopologyInstance(replica.InstanceAlias, analysisCode)
}
}

func emergentlyRecordStaleBinlogCoordinates(tabletAlias string, binlogCoordinates *inst.BinlogCoordinates) {
err := inst.RecordStaleInstanceBinlogCoordinates(tabletAlias, binlogCoordinates)
if err != nil {
log.Error(err)
}
}

// checkAndExecuteFailureDetectionProcesses tries to register for failure detection and potentially executes
// failure-detection processes.
func checkAndExecuteFailureDetectionProcesses(analysisEntry *inst.ReplicationAnalysis) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) {
Expand All @@ -407,19 +323,13 @@ func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias
log.Infof("VTOrc not configured to run ERS, skipping recovering %v", analysisCode)
return noRecoveryFunc
}
if isInEmergencyOperationGracefulPeriod(tabletAlias) {
return recoverGenericProblemFunc
}
return recoverDeadPrimaryFunc
case inst.PrimaryTabletDeleted:
// If ERS is disabled, we have no way of repairing the cluster.
if !config.ERSEnabled() {
log.Infof("VTOrc not configured to run ERS, skipping recovering %v", analysisCode)
return noRecoveryFunc
}
if isInEmergencyOperationGracefulPeriod(tabletAlias) {
return recoverGenericProblemFunc
}
return recoverPrimaryTabletDeletedFunc
case inst.ErrantGTIDDetected:
if !config.ConvertTabletWithErrantGTIDs() {
Expand All @@ -430,9 +340,6 @@ func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias
case inst.PrimaryHasPrimary:
return recoverPrimaryHasPrimaryFunc
case inst.LockedSemiSyncPrimary:
if isInEmergencyOperationGracefulPeriod(tabletAlias) {
return recoverGenericProblemFunc
}
return recoverLockedSemiSyncPrimaryFunc
case inst.ClusterHasNoPrimary:
return electNewPrimaryFunc
Expand Down Expand Up @@ -566,25 +473,6 @@ func analysisEntriesHaveSameRecovery(prevAnalysis, newAnalysis *inst.Replication
return prevRecoveryFunctionCode == newRecoveryFunctionCode
}

func runEmergentOperations(analysisEntry *inst.ReplicationAnalysis) {
switch analysisEntry.Analysis {
case inst.DeadPrimaryAndReplicas:
go emergentlyReadTopologyInstance(analysisEntry.AnalyzedInstancePrimaryAlias, analysisEntry.Analysis)
case inst.UnreachablePrimary:
go emergentlyReadTopologyInstance(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)
go emergentlyReadTopologyInstanceReplicas(analysisEntry.AnalyzedInstanceHostname, analysisEntry.AnalyzedInstancePort, analysisEntry.Analysis)
case inst.UnreachablePrimaryWithLaggingReplicas:
go emergentlyRestartReplicationOnTopologyInstanceReplicas(analysisEntry.AnalyzedInstanceHostname, analysisEntry.AnalyzedInstancePort, analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)
case inst.LockedSemiSyncPrimaryHypothesis:
go emergentlyReadTopologyInstance(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)
go emergentlyRecordStaleBinlogCoordinates(analysisEntry.AnalyzedInstanceAlias, &analysisEntry.AnalyzedInstanceBinlogCoordinates)
case inst.AllPrimaryReplicasNotReplicating:
go emergentlyReadTopologyInstance(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)
case inst.AllPrimaryReplicasNotReplicatingOrDead:
go emergentlyReadTopologyInstance(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)
}
}

// executeCheckAndRecoverFunction will choose the correct check & recovery function based on analysis.
// It executes the function synchronously
func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (err error) {
Expand All @@ -594,7 +482,6 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er
checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode)
analysisEntry.IsActionableRecovery = isActionableRecovery
runEmergentOperations(analysisEntry)

if checkAndRecoverFunctionCode == noRecoveryFunc {
// Unhandled problem type
Expand Down
9 changes: 0 additions & 9 deletions go/vt/vtorc/logic/topology_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package logic
import (
"context"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/require"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -88,7 +86,6 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) {
shouldBeEqual: true,
},
}
emergencyOperationGracefulPeriodMap = cache.New(time.Second*5, time.Millisecond*500)
t.Parallel()
for _, tt := range tests {
t.Run(string(tt.prevAnalysisCode)+","+string(tt.newAnalysisCode), func(t *testing.T) {
Expand Down Expand Up @@ -256,12 +253,6 @@ func TestGetCheckAndRecoverFunctionCode(t *testing.T) {
},
}

// Needed for the test to work
oldMap := emergencyOperationGracefulPeriodMap
emergencyOperationGracefulPeriodMap = cache.New(time.Second*5, time.Millisecond*500)
defer func() {
emergencyOperationGracefulPeriodMap = oldMap
}()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prevVal := config.ERSEnabled()
Expand Down

0 comments on commit 9bfc2a2

Please sign in to comment.