Skip to content

Commit

Permalink
Gocron: Adding basic functionality for a distributed system
Browse files Browse the repository at this point in the history
The use case is if this scheduler piggybacks on a load balanced web app
you may not want duplicate jobs running as a result
  • Loading branch information
marcsantiago committed Jul 21, 2019
1 parent 3691c73 commit 13f0168
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
32 changes: 28 additions & 4 deletions gocron.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ func NewJob(interval uint64, options ...func(*Job)) *Job {
func (j *Job) shouldRun() bool {
j.mu.Lock()
b := time.Now().After(j.nextRun)

// pop the set key if it exists and b == true then the job will run
// popping removes the item from the Distributed Redis which means other machines running the same
// job will not run
if j.DistributedRedisClient != nil {
res := j.DistributedRedisClient.SPop(redisKey + j.DistributedJobName)
if err := res.Err(); err != nil || err == redis.Nil {
// the job would have ran, so reset it to the next item in the schedule
if b {
_ = j.scheduleNextRun(false)
}
return false
}
}

j.mu.Unlock()

if b {
Expand Down Expand Up @@ -146,7 +161,7 @@ func (j *Job) run() ([]reflect.Value, error) {
j.lastRun = time.Now()
j.mu.Unlock()

err := j.scheduleNextRun()
err := j.scheduleNextRun(true)
if err != nil {
return result, err
}
Expand Down Expand Up @@ -182,8 +197,7 @@ func (j *Job) Do(jobFun interface{}, params ...interface{}) error {
j.jobFunc = fname
j.mu.Unlock()

// queue the next job, if redis is include add to set
j.scheduleNextRun()
j.scheduleNextRun(true)
return nil
}

Expand Down Expand Up @@ -248,7 +262,7 @@ func (j *Job) roundToMidnight(t time.Time) time.Time {
}

// scheduleNextRun Compute the instant when this job should run next
func (j *Job) scheduleNextRun() error {
func (j *Job) scheduleNextRun(running bool) error {
now := time.Now()
if j.lastRun == time.Unix(0, 0) {
j.mu.Lock()
Expand Down Expand Up @@ -289,6 +303,16 @@ func (j *Job) scheduleNextRun() error {
j.mu.Lock()
j.nextRun = j.nextRun.Add(period)
j.mu.Unlock()

if running {
// job is being rescheduled so send up another key
if j.DistributedRedisClient != nil {
if err := j.DistributedRedisClient.SAdd(redisKey + j.DistributedJobName).Err(); err != nil {
return err
}
}
}

}

return nil
Expand Down
20 changes: 10 additions & 10 deletions gocron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,20 @@ func TestDaily(t *testing.T) {

// schedule next run 1 day
dayJob := s.Every(1, defaultOption).Day()
dayJob.scheduleNextRun()
dayJob.scheduleNextRun(true)
exp := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, loc)
assertEqualTime("1 day", t, dayJob.nextRun, exp)

// schedule next run 2 days
dayJob = s.Every(2, defaultOption).Days()
dayJob.scheduleNextRun()
dayJob.scheduleNextRun(true)
exp = time.Date(now.Year(), now.Month(), now.Day()+2, 0, 0, 0, 0, loc)
assertEqualTime("2 days", t, dayJob.nextRun, exp)

// Job running longer than next schedule 1day 2 hours
dayJob = s.Every(1, defaultOption).Day()
dayJob.lastRun = time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+2, 0, 0, 0, loc)
dayJob.scheduleNextRun()
dayJob.scheduleNextRun(true)
exp = time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, loc)
assertEqualTime("1 day 2 hours", t, dayJob.nextRun, exp)

Expand All @@ -186,7 +186,7 @@ func TestDaily(t *testing.T) {
t.Error(err)
}

dayJob.scheduleNextRun()
dayJob.scheduleNextRun(true)
exp = time.Date(now.Year(), now.Month(), now.Day()+1, hour, minute, 0, 0, loc)
assertEqualTime("at 2 hours before now", t, dayJob.nextRun, exp)
}
Expand Down Expand Up @@ -217,14 +217,14 @@ func TestWeekdayAfterToday(t *testing.T) {
}

// First run
weekJob.scheduleNextRun()
weekJob.scheduleNextRun(true)
exp := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, loc)
assertEqualTime("first run", t, weekJob.nextRun, exp)

// Simulate job run 7 days before
weekJob.lastRun = weekJob.nextRun.AddDate(0, 0, -7)
// Next run
weekJob.scheduleNextRun()
weekJob.scheduleNextRun(true)
exp = time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, loc)
assertEqualTime("next run", t, weekJob.nextRun, exp)
}
Expand Down Expand Up @@ -254,14 +254,14 @@ func TestWeekdayBeforeToday(t *testing.T) {
weekJob = s.Every(1, defaultOption).Saturday()
}

weekJob.scheduleNextRun()
weekJob.scheduleNextRun(true)
exp := time.Date(now.Year(), now.Month(), now.Day()+6, 0, 0, 0, 0, loc)
assertEqualTime("first run", t, weekJob.nextRun, exp)

// Simulate job run 7 days before
weekJob.lastRun = weekJob.nextRun.AddDate(0, 0, -7)
// Next run
weekJob.scheduleNextRun()
weekJob.scheduleNextRun(true)
exp = time.Date(now.Year(), now.Month(), now.Day()+6, 0, 0, 0, 0, loc)
assertEqualTime("nest run", t, weekJob.nextRun, exp)
}
Expand Down Expand Up @@ -317,14 +317,14 @@ func TestWeekdayAt(t *testing.T) {
}

// First run
weekJob.scheduleNextRun()
weekJob.scheduleNextRun(true)
exp := time.Date(now.Year(), now.Month(), now.Day()+1, hour, minute, 0, 0, loc)
assertEqualTime("first run", t, weekJob.nextRun, exp)

// Simulate job run 7 days before
weekJob.lastRun = weekJob.nextRun.AddDate(0, 0, -7)
// Next run
weekJob.scheduleNextRun()
weekJob.scheduleNextRun(true)
exp = time.Date(now.Year(), now.Month(), now.Day()+1, hour, minute, 0, 0, loc)
assertEqualTime("next run", t, weekJob.nextRun, exp)
}
Expand Down

0 comments on commit 13f0168

Please sign in to comment.