diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 230b418247..6727aad341 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -39,7 +39,7 @@ type jobChangeType int const ( resetJobs jobChangeType = iota createJobs - updateJobs + finishedJobs removedDeploymentKey updatedHashring ) @@ -183,7 +183,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { logger.Errorf(err, "failed to end cronjob %v:%v", job.DeploymentKey, job.Verb) } else { s.jobChanges.Publish(jobChange{ - changeType: updateJobs, + changeType: finishedJobs, jobs: []dal.CronJob{updatedJob}, }) } @@ -217,8 +217,8 @@ func (s *Service) watchForUpdates(ctx context.Context) { now := s.clock.Now() next := time.Now().Add(time.Hour) // should never be reached, expect a different signal long beforehand for _, j := range state.jobs { - if possibleNext, err := s.nextCheckForJob(j, state, false); err == nil { - next = *possibleNext + if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil { + next = possibleNext break } } @@ -238,7 +238,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { case <-s.clock.After(next.Sub(now)): // Try starting jobs in db jobsToAttempt := slices.Filter(state.jobs, func(j dal.CronJob) bool { - if next, err := s.nextCheckForJob(j, state, true); err == nil { + if next, err := s.nextAttemptForJob(j, state, true); err == nil { return !next.After(time.Now().UTC()) } return false @@ -284,7 +284,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { case createJobs: logger.Tracef("adding %d jobs", len(event.jobs)) state.addJobs(event.jobs) - case updateJobs: + case finishedJobs: logger.Tracef("updating %d jobs", len(event.jobs)) state.updateJobs(event.jobs) case removedDeploymentKey: @@ -298,34 +298,34 @@ func (s *Service) watchForUpdates(ctx context.Context) { } func (s *Service) sortJobs(state *State, i, j dal.CronJob) int { - iNext, err := s.nextCheckForJob(i, state, false) + iNext, err := s.nextAttemptForJob(i, state, false) if err != nil { return 1 } - jNext, err := s.nextCheckForJob(j, state, false) + jNext, err := s.nextAttemptForJob(j, state, false) if err != nil { return -1 } - return iNext.Compare(*jNext) + return iNext.Compare(jNext) } -func (s *Service) nextCheckForJob(job dal.CronJob, state *State, allowsNow bool) (*time.Time, error) { +func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow bool) (time.Time, error) { if !s.isResponsibleForJob(job, state) { - return nil, fmt.Errorf("controller is not responsible for job") + return time.Now(), fmt.Errorf("controller is not responsible for job") } if job.State == dal.JobStateExecuting { if state.isExecutingInCurrentController(job) { // return a time in the future, meaning don't schedule at this time - return nil, fmt.Errorf("controller is already waiting for job to finish") + return time.Now(), fmt.Errorf("controller is already waiting for job to finish") } // We don't know when the other controller will finish this job // We should check again when the next execution date is assuming the job finishes next, err := gronx.NextTickAfter(job.Schedule, time.Now().UTC(), allowsNow) if err == nil { - return &next, nil + return next, nil } } - return &job.NextExecution, nil + return job.NextExecution, nil } // Synchronise the hash ring with the active controllers.