Skip to content

Commit

Permalink
add start ordinal e2e case
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <[email protected]>
  • Loading branch information
ABNER-1 committed Jun 12, 2024
1 parent 68159dd commit 2387ca6
Show file tree
Hide file tree
Showing 5 changed files with 658 additions and 34 deletions.
54 changes: 25 additions & 29 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,10 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
scaleMaxUnavailable = &maxUnavailable
}
processReplicaFn := func(i int) (bool, bool, error) {
processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i, &status, scaleMaxUnavailable)
}
if shouldExit, err := runForAllWithBreak(replicas, processReplicaFn); shouldExit || err != nil {
if shouldExit, err := runForAllWithoutBatch(replicas, processReplicaFn); shouldExit || err != nil {
updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned)
return &status, err
}
Expand Down Expand Up @@ -1004,12 +1004,12 @@ func (ssc *defaultStatefulSetControl) processReplica(
monotonic bool,
replicas []*v1.Pod, i int,
status *appsv1beta1.StatefulSetStatus,
scaleMaxUnavailable *int) (bool, bool, error) {
scaleMaxUnavailable *int) (bool, error) {
minReadySeconds := getMinReadySeconds(set)
logger := klog.FromContext(ctx)

if replicas[i] == nil {
return false, false, nil
return false, nil
}
// Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods
Expand All @@ -1019,32 +1019,32 @@ func (ssc *defaultStatefulSetControl) processReplica(
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if replicas[i].DeletionTimestamp == nil {
if _, err := ssc.deletePod(set, replicas[i]); err != nil {
return true, false, err
return true, err
}
}
// New pod should be generated on the next sync after the current pod is removed from etcd.
return true, false, nil
return true, nil
}
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
return true, false, err
return true, err
} else if isStale {
// If a pod has a stale PVC, no more work can be done this round.
return true, false, err
return true, err
}
}
lifecycle.SetPodLifecycle(appspub.LifecycleStateNormal)(replicas[i])
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
msg := fmt.Sprintf("StatefulPodControl failed to create Pod error: %s", err)
condition := NewStatefulsetCondition(appsv1beta1.FailedCreatePod, v1.ConditionTrue, "", msg)
SetStatefulsetCondition(status, condition)
return true, false, err
return true, err
}
if monotonic {
// if the set does not allow bursting, return immediately
return true, false, nil
return true, nil
} else if decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is Creating, and break pods scale",
Expand All @@ -1053,10 +1053,10 @@ func (ssc *defaultStatefulSetControl) processReplica(
replicas[i].Name)
// here, we change break action in for-loop to exit update function
// todo: pay attention to this change
return false, true, nil
return true, nil
}
// pod created, no more work possible for this round
return false, false, nil
return false, nil
}

// If the Pod is in pending state then trigger PVC creation to create missing PVCs
Expand All @@ -1065,7 +1065,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
"StatefulSet is triggering PVC creation for pending Pod",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
return true, false, err
return true, err
}
}

Expand All @@ -1074,21 +1074,21 @@ func (ssc *defaultStatefulSetControl) processReplica(
if isTerminating(replicas[i]) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, false, nil
return true, nil
} else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is Terminating, and break pods scale",
set.Namespace,
set.Name,
replicas[i].Name)
return false, true, nil
return true, nil
}

// Update InPlaceUpdateReady condition for pod
if res := ssc.inplaceControl.Refresh(replicas[i], nil); res.RefreshErr != nil {
klog.Errorf("StatefulSet %s/%s failed to update pod %s condition for inplace: %v",
set.Namespace, set.Name, replicas[i].Name, res.RefreshErr)
return true, false, res.RefreshErr
return true, res.RefreshErr
} else if res.DelayDuration > 0 {
durationStore.Push(getStatefulSetKey(set), res.DelayDuration)
}
Expand All @@ -1114,7 +1114,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
}
return true, false, nil
return true, nil
} else if !isAvailable && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is unavailable, and break pods scale",
Expand All @@ -1125,7 +1125,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
// make sure we check later
durationStore.Push(getStatefulSetKey(set), waitTime)
}
return false, true, nil
return true, nil
}
}

Expand All @@ -1141,7 +1141,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
}

if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
return false, false, nil
return false, nil
}

// Make a deep copy so we don't mutate the shared cache
Expand All @@ -1150,10 +1150,10 @@ func (ssc *defaultStatefulSetControl) processReplica(
msg := fmt.Sprintf("StatefulPodControl failed to update Pod error: %s", err)
condition := NewStatefulsetCondition(appsv1beta1.FailedUpdatePod, v1.ConditionTrue, "", msg)
SetStatefulsetCondition(status, condition)
return true, false, err
return true, err
}

return false, false, nil
return false, nil
}

func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
Expand Down Expand Up @@ -1188,25 +1188,21 @@ func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, err
return successes, nil
}

// runForAllWithBreak iterates through all pod objects, applying the given function until a specified condition is met.
// runForAllWithoutBatch iterates through all pod objects, applying the given function until a specified condition is met.
// The function can decide whether to continue, break out of the loop, or return an error.
// Parameters:
// - pods: An array of pointers to Pod objects, representing the collection of pods to be processed.
// - fn: A function that takes an index as a parameter and returns three values:
// 1. A boolean indicating whether to exit the current iteration.
// 2. A boolean indicating whether to break out of the loop.
// 3. An error object, in case an error occurs during function execution.
// 2. An error object, in case an error occurs during function execution.
//
// Returns:
// - A boolean indicating whether an exit condition was met or an error occurred during iteration.
// - An error object, if an error was encountered during the execution of the provided function.
func runForAllWithBreak(pods []*v1.Pod, fn func(i int) (bool, bool, error)) (bool, error) {
func runForAllWithoutBatch(pods []*v1.Pod, fn func(i int) (bool, error)) (bool, error) {
for i := range pods {
if shouldExit, shouldBreak, err := fn(i); shouldExit || err != nil {
if shouldExit, err := fn(i); shouldExit || err != nil {
return true, err
} else if shouldBreak {
//Introduce this branch to exit the for-loop while proceeding with subsequent update logic.
break
}
}
return false, nil
Expand Down
Loading

0 comments on commit 2387ca6

Please sign in to comment.