diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index cb4e0571..ae70b837 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -254,7 +254,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *arbv1.AppWrapper: - klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s &qj=%p qj=%+v", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender, t, t) + klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s ", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender) // todo: This is a current workaround for duplicate message bug. // if t.Status.Local == true { // ignore duplicate message from cache // return false @@ -440,15 +440,15 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { return } - - if cleanAppWrapper { - klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name) - go qjm.Cleanup(ctx, updateNewJob) - } else { - // Only back-off AWs that are in state running and not in state Failed - if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { - klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) - qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) + if cleanAppWrapper { + klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name) + go qjm.Cleanup(ctx, updateNewJob) + } else { + // Only back-off AWs that are in state running and not in state Failed + if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { + klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) + qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) + } } } } @@ -1368,7 +1368,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper func (cc *XController) updateStatusInEtcdWithRetry(ctx context.Context, source *arbv1.AppWrapper, caller string) error { klog.V(4).Infof("[updateStatusInEtcdWithMergeFunction] trying to update '%s/%s' version '%s' called by '%s'", source.Namespace, source.Name, source.ResourceVersion, caller) source.Status.Sender = "before " + caller // set Sender string to indicate code location - updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(10, 100*time.Millisecond), &EtcdErrorClassifier{}) + updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(1, 100*time.Millisecond), &EtcdErrorClassifier{}) updateStatusRetrierRetrier.SetJitter(0.05) updatedAW := source.DeepCopy() err := updateStatusRetrierRetrier.RunCtx(ctx, func(localContext context.Context) error { @@ -1565,10 +1565,10 @@ func (cc *XController) addQueueJob(obj interface{}) { firstTime := metav1.NowMicro() qj, ok := obj.(*arbv1.AppWrapper) if !ok { - klog.Errorf("[Informer-addQJ] object is not AppWrapper. object=%+v", obj) + klog.Errorf("[Informer-addQJ] object is not AppWrapper.") return } - klog.V(6).Infof("[Informer-addQJ] %s/%s &qj=%p qj=%+v", qj.Namespace, qj.Name, qj, qj) + klog.V(6).Infof("[Informer-addQJ] %s/%s", qj.Namespace, qj.Name) if qj.Status.QueueJobState == "" { qj.Status.ControllerFirstTimestamp = firstTime qj.Status.SystemPriority = float64(qj.Spec.Priority) @@ -1603,7 +1603,7 @@ func (cc *XController) addQueueJob(obj interface{}) { // updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate // on stale AWs. This has potential to improve performance at scale. if hasCompletionStatus { - requeueInterval := 5 * time.Second + requeueIntervalForCompletionStatus := 5 * time.Second key, err := cache.MetaNamespaceKeyFunc(qj) if err != nil { klog.Warningf("[Informer-addQJ] Error getting AW %s/%s from cache cannot determine completion status", qj.Namespace, qj.Name) @@ -1611,10 +1611,11 @@ func (cc *XController) addQueueJob(obj interface{}) { } go func() { for { - time.Sleep(requeueInterval) + time.Sleep(requeueIntervalForCompletionStatus) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if err != nil && !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name) + if err != nil || !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status", qj.Namespace, qj.Name) + break } else { var latestAw *arbv1.AppWrapper if latestObj != nil { @@ -1649,8 +1650,9 @@ func (cc *XController) addQueueJob(obj interface{}) { for { time.Sleep(requeueInterval) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if err != nil && !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name) + if err != nil || !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling", qj.Namespace, qj.Name) + break } else { var latestAw *arbv1.AppWrapper if latestObj != nil { @@ -1758,9 +1760,9 @@ func (cc *XController) enqueue(obj interface{}) error { err := cc.eventQueue.Add(qj) // add to FIFO queue if not in, update object & keep position if already in FIFO queue if err != nil { - klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, err) + klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status, err) } else { - klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status) + klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status) } return err }