Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix to handle when next ends up in the past #650

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,15 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {
delete(s.jobs, id)
}

// Jobs coming back from the executor to the scheduler that
// need to evaluated for rescheduling.
func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
j := s.jobs[id]
j.lastRun = j.nextRun

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
// job if it has reached the limit.
if j.limitRunsTo != nil {
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
Expand All @@ -288,10 +293,25 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {

next := j.next(j.lastRun)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
// since they are one time only, they do not need rescheduling.
return
}
if next.Before(s.now()) {
// in some cases the next run time can be in the past, for example:
// - the time on the machine was incorrect and has been synced with ntp
// - the machine went to sleep, and woke up some time later
// in those cases, we want to increment to the next run in the future
// and schedule the job for that time.
for next.Before(s.now()) {
next = j.next(next)
}
}
j.nextRun = next
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
// run if the scheduler has been shutdown.
select {
case <-s.shutdownCtx.Done():
return
Expand All @@ -301,6 +321,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
}:
}
})
// update the job with its new next and last run times and timer.
s.jobs[id] = j
}

Expand Down
Loading