diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index a25250a8..ae70b837 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -254,7 +254,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *arbv1.AppWrapper: - klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s &qj=%p qj=%+v", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender, t, t) + klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s ", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender) // todo: This is a current workaround for duplicate message bug. // if t.Status.Local == true { // ignore duplicate message from cache // return false @@ -345,6 +345,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { newjob.Status.CanRun = false newjob.Status.FilterIgnore = true // update QueueJobState only cleanAppWrapper := false + generatedCondition := false // If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed. if (newjob.Status.State == arbv1.AppWrapperStateActive) && (newjob.Spec.SchedSpec.DispatchDuration.Limit > 0) { if newjob.Spec.SchedSpec.DispatchDuration.Overrun { @@ -370,6 +371,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } // cannot use cleanup AW, since it puts AW back in running state qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob) + generatedCondition = true } } @@ -412,7 +414,8 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } updateNewJob = newjob.DeepCopy() - } else { + generatedCondition = true + } else if newjob.Status.Running == 0 && newjob.Status.Succeeded == 0 && newjob.Status.State == arbv1.AppWrapperStateActive { // If pods failed scheduling generate new preempt condition message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running) index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling") @@ -427,22 +430,25 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } updateNewJob = newjob.DeepCopy() + generatedCondition = true } - err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") - if err != nil { - klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) - return - } + if generatedCondition { + err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") + if err != nil { + klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) + return + } - if cleanAppWrapper { - klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name) - go qjm.Cleanup(ctx, updateNewJob) - } else { - // Only back-off AWs that are in state running and not in state Failed - if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { - klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) - qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) + if cleanAppWrapper { + klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name) + go qjm.Cleanup(ctx, updateNewJob) + } else { + // Only back-off AWs that are in state running and not in state Failed + if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { + klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) + qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) + } } } } @@ -1362,7 +1368,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper func (cc *XController) updateStatusInEtcdWithRetry(ctx context.Context, source *arbv1.AppWrapper, caller string) error { klog.V(4).Infof("[updateStatusInEtcdWithMergeFunction] trying to update '%s/%s' version '%s' called by '%s'", source.Namespace, source.Name, source.ResourceVersion, caller) source.Status.Sender = "before " + caller // set Sender string to indicate code location - updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(10, 100*time.Millisecond), &EtcdErrorClassifier{}) + updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(1, 100*time.Millisecond), &EtcdErrorClassifier{}) updateStatusRetrierRetrier.SetJitter(0.05) updatedAW := source.DeepCopy() err := updateStatusRetrierRetrier.RunCtx(ctx, func(localContext context.Context) error { @@ -1559,10 +1565,10 @@ func (cc *XController) addQueueJob(obj interface{}) { firstTime := metav1.NowMicro() qj, ok := obj.(*arbv1.AppWrapper) if !ok { - klog.Errorf("[Informer-addQJ] object is not AppWrapper. object=%+v", obj) + klog.Errorf("[Informer-addQJ] object is not AppWrapper.") return } - klog.V(6).Infof("[Informer-addQJ] %s/%s &qj=%p qj=%+v", qj.Namespace, qj.Name, qj, qj) + klog.V(6).Infof("[Informer-addQJ] %s/%s", qj.Namespace, qj.Name) if qj.Status.QueueJobState == "" { qj.Status.ControllerFirstTimestamp = firstTime qj.Status.SystemPriority = float64(qj.Spec.Priority) @@ -1597,7 +1603,7 @@ func (cc *XController) addQueueJob(obj interface{}) { // updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate // on stale AWs. This has potential to improve performance at scale. if hasCompletionStatus { - requeueInterval := 5 * time.Second + requeueIntervalForCompletionStatus := 5 * time.Second key, err := cache.MetaNamespaceKeyFunc(qj) if err != nil { klog.Warningf("[Informer-addQJ] Error getting AW %s/%s from cache cannot determine completion status", qj.Namespace, qj.Name) @@ -1605,10 +1611,11 @@ func (cc *XController) addQueueJob(obj interface{}) { } go func() { for { - time.Sleep(requeueInterval) + time.Sleep(requeueIntervalForCompletionStatus) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if err != nil && !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name) + if err != nil || !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status", qj.Namespace, qj.Name) + break } else { var latestAw *arbv1.AppWrapper if latestObj != nil { @@ -1643,8 +1650,9 @@ func (cc *XController) addQueueJob(obj interface{}) { for { time.Sleep(requeueInterval) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if err != nil && !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name) + if err != nil || !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling", qj.Namespace, qj.Name) + break } else { var latestAw *arbv1.AppWrapper if latestObj != nil { @@ -1752,9 +1760,9 @@ func (cc *XController) enqueue(obj interface{}) error { err := cc.eventQueue.Add(qj) // add to FIFO queue if not in, update object & keep position if already in FIFO queue if err != nil { - klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, err) + klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status, err) } else { - klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status) + klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status) } return err }