Skip to content

Commit

Permalink
Always reset failure counts on existing plans
Browse files Browse the repository at this point in the history
  • Loading branch information
HarrisonWAffel committed Nov 15, 2024
1 parent 14a48bc commit 41594d3
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 35 deletions.
4 changes: 1 addition & 3 deletions pkg/capr/planner/etcdcreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func (p *Planner) runEtcdSnapshotCreate(controlPlane *rkev1.RKEControlPlane, tok
// Notably, this function will blatantly ignore drain and concurrency options, as during an etcd snapshot operation, there is no necessity to drain nodes.
func (p *Planner) runEtcdSnapshotManagementServiceStart(controlPlane *rkev1.RKEControlPlane, tokensSecret plan.Secret, clusterPlan *plan.Plan, include roleFilter, operation string) error {
// Generate and deliver desired plan for the bootstrap/init node first.
if err := p.reconcile(controlPlane, tokensSecret, clusterPlan, true, bootstrapTier, isEtcd, isNotInitNodeOrIsDeleting,
"1", "", controlPlane.Spec.UpgradeStrategy.ControlPlaneDrainOptions,
-1, 1, false, true); err != nil {
if err := p.reconcile(controlPlane, tokensSecret, clusterPlan, true, bootstrapTier, isEtcd, isNotInitNodeOrIsDeleting, "1", "", controlPlane.Spec.UpgradeStrategy.ControlPlaneDrainOptions, -1, 1, false); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/capr/planner/etcdrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func (p *Planner) runEtcdRestoreServiceStop(controlPlane *rkev1.RKEControlPlane,
stopPlan.Instructions = append(stopPlan.Instructions, generateRemoveTLSAndCredDirInstructions(controlPlane)...)
}
if !equality.Semantic.DeepEqual(server.Plan.Plan, stopPlan) {
if err := p.store.UpdatePlan(server, stopPlan, joinedServer, 0, 0, true); err != nil {
if err := p.store.UpdatePlan(server, stopPlan, joinedServer, 0, 0); err != nil {
return err
}
updated = true
Expand Down
30 changes: 9 additions & 21 deletions pkg/capr/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,7 @@ func (p *Planner) fullReconcile(cp *rkev1.RKEControlPlane, status rkev1.RKEContr
}

// select all etcd and then filter to just initNodes so that unavailable count is correct
err = p.reconcile(cp, clusterSecretTokens, plan, true, bootstrapTier, isEtcd, isNotInitNodeOrIsDeleting,
"1", "", controlPlaneDrainOptions, -1, 1,
false, true)
err = p.reconcile(cp, clusterSecretTokens, plan, true, bootstrapTier, isEtcd, isNotInitNodeOrIsDeleting, "1", "", controlPlaneDrainOptions, -1, 1, false)
capr.Bootstrapped.True(&status)
firstIgnoreError, err = ignoreErrors(firstIgnoreError, err)
if err != nil {
Expand All @@ -382,18 +380,14 @@ func (p *Planner) fullReconcile(cp *rkev1.RKEControlPlane, status rkev1.RKEContr
}

// Process all nodes that have the etcd role and are NOT an init node or deleting. Only process 1 node at a time.
err = p.reconcile(cp, clusterSecretTokens, plan, true, etcdTier, isEtcd, isInitNodeOrDeleting,
"1", joinServer, controlPlaneDrainOptions,
-1, 1, false, true)
err = p.reconcile(cp, clusterSecretTokens, plan, true, etcdTier, isEtcd, isInitNodeOrDeleting, "1", joinServer, controlPlaneDrainOptions, -1, 1, false)
firstIgnoreError, err = ignoreErrors(firstIgnoreError, err)
if err != nil {
return status, err
}

// Process all nodes that have the controlplane role and are NOT an init node or deleting.
err = p.reconcile(cp, clusterSecretTokens, plan, true, controlPlaneTier, isControlPlane, isInitNodeOrDeleting,
controlPlaneConcurrency, joinServer, controlPlaneDrainOptions, -1, 1,
false, true)
err = p.reconcile(cp, clusterSecretTokens, plan, true, controlPlaneTier, isControlPlane, isInitNodeOrDeleting, controlPlaneConcurrency, joinServer, controlPlaneDrainOptions, -1, 1, false)
firstIgnoreError, err = ignoreErrors(firstIgnoreError, err)
if err != nil {
return status, err
Expand All @@ -411,9 +405,7 @@ func (p *Planner) fullReconcile(cp *rkev1.RKEControlPlane, status rkev1.RKEContr
}

// Process all nodes that are ONLY linux worker nodes.
err = p.reconcile(cp, clusterSecretTokens, plan, false, workerTier, isOnlyLinuxWorker, isInitNodeOrDeleting,
workerConcurrency, "", workerDrainOptions, -1, 1,
false, true)
err = p.reconcile(cp, clusterSecretTokens, plan, false, workerTier, isOnlyLinuxWorker, isInitNodeOrDeleting, workerConcurrency, "", workerDrainOptions, -1, 1, false)
firstIgnoreError, err = ignoreErrors(firstIgnoreError, err)
if err != nil {
return status, err
Expand All @@ -436,9 +428,7 @@ func (p *Planner) fullReconcile(cp *rkev1.RKEControlPlane, status rkev1.RKEContr
resetFailureCountOnRestart = true
}

err = p.reconcile(cp, clusterSecretTokens, plan, false, workerTier, isOnlyWindowsWorker, isInitNodeOrDeleting,
workerConcurrency, "", workerDrainOptions, windowsMaxFailures,
windowsMaxFailureThreshold, resetFailureCountOnRestart, false)
err = p.reconcile(cp, clusterSecretTokens, plan, false, workerTier, isOnlyWindowsWorker, isInitNodeOrDeleting, workerConcurrency, "", workerDrainOptions, windowsMaxFailures, windowsMaxFailureThreshold, resetFailureCountOnRestart)
firstIgnoreError, err = ignoreErrors(firstIgnoreError, err)
if err != nil {
return status, err
Expand Down Expand Up @@ -855,9 +845,7 @@ type reconcilable struct {
minorChange bool
}

func (p *Planner) reconcile(controlPlane *rkev1.RKEControlPlane, tokensSecret plan.Secret, clusterPlan *plan.Plan, required bool, tierName string,
include, exclude roleFilter, maxUnavailable, forcedJoinURL string, drainOptions rkev1.DrainOptions,
maxFailures, failureThreshold int, resetFailureCountOnSystemAgentRestart, overwriteFailureValues bool) error {
func (p *Planner) reconcile(controlPlane *rkev1.RKEControlPlane, tokensSecret plan.Secret, clusterPlan *plan.Plan, required bool, tierName string, include, exclude roleFilter, maxUnavailable, forcedJoinURL string, drainOptions rkev1.DrainOptions, maxFailures, failureThreshold int, resetFailureCountOnSystemAgentRestart bool) error {
var (
ready, outOfSync, nonReady, errMachines, draining, uncordoned []string
messages = map[string][]string{}
Expand Down Expand Up @@ -929,14 +917,14 @@ func (p *Planner) reconcile(controlPlane *rkev1.RKEControlPlane, tokensSecret pl
logrus.Debugf("[planner] rkecluster %s/%s reconcile tier %s - setting initial plan for machine %s/%s", controlPlane.Namespace, controlPlane.Name, tierName, r.entry.Machine.Namespace, r.entry.Machine.Name)
logrus.Tracef("[planner] rkecluster %s/%s reconcile tier %s - initial plan for machine %s/%s new: %+v", controlPlane.Namespace, controlPlane.Name, tierName, r.entry.Machine.Namespace, r.entry.Machine.Name, r.desiredPlan)
outOfSync = append(outOfSync, r.entry.Machine.Name)
if err := p.store.UpdatePlan(r.entry, r.desiredPlan, r.joinedURL, maxFailures, failureThreshold, overwriteFailureValues); err != nil {
if err := p.store.UpdatePlan(r.entry, r.desiredPlan, r.joinedURL, maxFailures, failureThreshold); err != nil {
return err
}
} else if r.minorChange {
logrus.Debugf("[planner] rkecluster %s/%s reconcile tier %s - minor plan change detected for machine %s/%s, updating plan immediately", controlPlane.Namespace, controlPlane.Name, tierName, r.entry.Machine.Namespace, r.entry.Machine.Name)
logrus.Tracef("[planner] rkecluster %s/%s reconcile tier %s - minor plan change for machine %s/%s old: %+v, new: %+v", controlPlane.Namespace, controlPlane.Name, tierName, r.entry.Machine.Namespace, r.entry.Machine.Name, r.entry.Plan.Plan, r.desiredPlan)
outOfSync = append(outOfSync, r.entry.Machine.Name)
if err := p.store.UpdatePlan(r.entry, r.desiredPlan, r.joinedURL, maxFailures, failureThreshold, overwriteFailureValues); err != nil {
if err := p.store.UpdatePlan(r.entry, r.desiredPlan, r.joinedURL, maxFailures, failureThreshold); err != nil {
return err
}
} else if r.change {
Expand All @@ -960,7 +948,7 @@ func (p *Planner) reconcile(controlPlane *rkev1.RKEControlPlane, tokensSecret pl
// Drain is done (or didn't need to be done) and there are no errors, so the plan should be updated to enact the reason the node was drained.
logrus.Debugf("[planner] rkecluster %s/%s reconcile tier %s - major plan change for machine %s/%s", controlPlane.Namespace, controlPlane.Name, tierName, r.entry.Machine.Namespace, r.entry.Machine.Name)
logrus.Tracef("[planner] rkecluster %s/%s reconcile tier %s - major plan change for machine %s/%s old: %+v, new: %+v", controlPlane.Namespace, controlPlane.Name, tierName, r.entry.Machine.Namespace, r.entry.Machine.Name, r.entry.Plan.Plan, r.desiredPlan)
if err = p.store.UpdatePlan(r.entry, r.desiredPlan, r.joinedURL, maxFailures, failureThreshold, overwriteFailureValues); err != nil {
if err = p.store.UpdatePlan(r.entry, r.desiredPlan, r.joinedURL, maxFailures, failureThreshold); err != nil {
return err
} else if r.entry.Metadata.Annotations[capr.DrainDoneAnnotation] != "" {
messages[r.entry.Machine.Name] = append(messages[r.entry.Machine.Name], "drain completed")
Expand Down
14 changes: 4 additions & 10 deletions pkg/capr/planner/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (p *PlanStore) getPlanSecretFromMachine(machine *capi.Machine) (*corev1.Sec

// UpdatePlan should not be called directly as it will not block further progress if the plan is not in sync
// maxFailures is the number of attempts the system-agent will make to run the plan (in a failed state). failureThreshold is used to determine when the plan has failed.
func (p *PlanStore) UpdatePlan(entry *planEntry, newNodePlan plan.NodePlan, joinedTo string, maxFailures, failureThreshold int, overwriteFailureKeys bool) error {
func (p *PlanStore) UpdatePlan(entry *planEntry, newNodePlan plan.NodePlan, joinedTo string, maxFailures, failureThreshold int) error {
if maxFailures < failureThreshold && failureThreshold != -1 && maxFailures != -1 {
return fmt.Errorf("failureThreshold (%d) cannot be greater than maxFailures (%d)", failureThreshold, maxFailures)
}
Expand Down Expand Up @@ -400,21 +400,15 @@ func (p *PlanStore) UpdatePlan(entry *planEntry, newNodePlan plan.NodePlan, join
// If the plan is being updated, then delete the probe-statuses so their healthy status will be reported as healthy only when they pass.
delete(secret.Data, "probe-statuses")

_, maxFailuresHasBeenSet := secret.Data["max-failures"]
secret.Data["plan"] = data
if maxFailures > 0 || maxFailures == -1 {
if !maxFailuresHasBeenSet || maxFailuresHasBeenSet && overwriteFailureKeys {
secret.Data["max-failures"] = []byte(strconv.Itoa(maxFailures))
}
secret.Data["max-failures"] = []byte(strconv.Itoa(maxFailures))
} else {
delete(secret.Data, "max-failures")
}

_, failureThresholdHasBeenSet := secret.Data["failure-threshold"]
if failureThreshold > 0 || failureThreshold == -1 {
if !failureThresholdHasBeenSet || failureThresholdHasBeenSet && overwriteFailureKeys {
secret.Data["failure-threshold"] = []byte(strconv.Itoa(failureThreshold))
}
secret.Data["failure-threshold"] = []byte(strconv.Itoa(failureThreshold))
} else {
delete(secret.Data, "failure-threshold")
}
Expand Down Expand Up @@ -483,7 +477,7 @@ func (p *PlanStore) removePlanSecretLabel(entry *planEntry, key string) error {
// assignAndCheckPlan assigns the given newPlan to the designated server in the planEntry, and will return nil if the plan is assigned and in sync.
func assignAndCheckPlan(store *PlanStore, msg string, entry *planEntry, newPlan plan.NodePlan, joinedTo string, failureThreshold, maxRetries int) error {
if entry.Plan == nil || !equality.Semantic.DeepEqual(entry.Plan.Plan, newPlan) {
if err := store.UpdatePlan(entry, newPlan, joinedTo, failureThreshold, maxRetries, true); err != nil {
if err := store.UpdatePlan(entry, newPlan, joinedTo, failureThreshold, maxRetries); err != nil {
return err
}
return errWaiting(fmt.Sprintf("starting %s", msg))
Expand Down

0 comments on commit 41594d3

Please sign in to comment.