Skip to content

Commit

Permalink
cleanup routines and logger
Browse files Browse the repository at this point in the history
  • Loading branch information
asm582 committed Sep 29, 2023
1 parent 339e3a2 commit 3c3e384
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *arbv1.AppWrapper:
klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s &qj=%p qj=%+v", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender, t, t)
klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s ", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender)
// todo: This is a current workaround for duplicate message bug.
// if t.Status.Local == true { // ignore duplicate message from cache
// return false
Expand Down Expand Up @@ -440,15 +440,15 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
return
}


if cleanAppWrapper {
klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name)
go qjm.Cleanup(ctx, updateNewJob)
} else {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name)
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
if cleanAppWrapper {
klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name)
go qjm.Cleanup(ctx, updateNewJob)
} else {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name)
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
}
}
}
}
Expand Down Expand Up @@ -1368,7 +1368,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper
func (cc *XController) updateStatusInEtcdWithRetry(ctx context.Context, source *arbv1.AppWrapper, caller string) error {
klog.V(4).Infof("[updateStatusInEtcdWithMergeFunction] trying to update '%s/%s' version '%s' called by '%s'", source.Namespace, source.Name, source.ResourceVersion, caller)
source.Status.Sender = "before " + caller // set Sender string to indicate code location
updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(10, 100*time.Millisecond), &EtcdErrorClassifier{})
updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(1, 100*time.Millisecond), &EtcdErrorClassifier{})
updateStatusRetrierRetrier.SetJitter(0.05)
updatedAW := source.DeepCopy()
err := updateStatusRetrierRetrier.RunCtx(ctx, func(localContext context.Context) error {
Expand Down Expand Up @@ -1565,10 +1565,10 @@ func (cc *XController) addQueueJob(obj interface{}) {
firstTime := metav1.NowMicro()
qj, ok := obj.(*arbv1.AppWrapper)
if !ok {
klog.Errorf("[Informer-addQJ] object is not AppWrapper. object=%+v", obj)
klog.Errorf("[Informer-addQJ] object is not AppWrapper.")
return
}
klog.V(6).Infof("[Informer-addQJ] %s/%s &qj=%p qj=%+v", qj.Namespace, qj.Name, qj, qj)
klog.V(6).Infof("[Informer-addQJ] %s/%s", qj.Namespace, qj.Name)
if qj.Status.QueueJobState == "" {
qj.Status.ControllerFirstTimestamp = firstTime
qj.Status.SystemPriority = float64(qj.Spec.Priority)
Expand Down Expand Up @@ -1603,18 +1603,19 @@ func (cc *XController) addQueueJob(obj interface{}) {
// updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
// on stale AWs. This has potential to improve performance at scale.
if hasCompletionStatus {
requeueInterval := 5 * time.Second
requeueIntervalForCompletionStatus := 5 * time.Second
key, err := cache.MetaNamespaceKeyFunc(qj)
if err != nil {
klog.Warningf("[Informer-addQJ] Error getting AW %s/%s from cache cannot determine completion status", qj.Namespace, qj.Name)
// TODO: should we return from this loop?
}
go func() {
for {
time.Sleep(requeueInterval)
time.Sleep(requeueIntervalForCompletionStatus)
latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
if err != nil && !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name)
if err != nil || !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status", qj.Namespace, qj.Name)
break
} else {
var latestAw *arbv1.AppWrapper
if latestObj != nil {
Expand Down Expand Up @@ -1649,8 +1650,9 @@ func (cc *XController) addQueueJob(obj interface{}) {
for {
time.Sleep(requeueInterval)
latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
if err != nil && !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name)
if err != nil || !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling", qj.Namespace, qj.Name)
break
} else {
var latestAw *arbv1.AppWrapper
if latestObj != nil {
Expand Down Expand Up @@ -1758,9 +1760,9 @@ func (cc *XController) enqueue(obj interface{}) error {

err := cc.eventQueue.Add(qj) // add to FIFO queue if not in, update object & keep position if already in FIFO queue
if err != nil {
klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, err)
klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status, err)
} else {
klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status)
klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status)
}
return err
}
Expand Down

0 comments on commit 3c3e384

Please sign in to comment.