From d98f98b8d54e92b12767641e7c5a298573b45d9b Mon Sep 17 00:00:00 2001 From: Marc Santiago Date: Mon, 22 Jul 2019 11:39:55 -0400 Subject: [PATCH] Gocron: Correcting distributed logic --- gocron.go | 33 +---------- gocron_test.go | 153 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 154 insertions(+), 32 deletions(-) diff --git a/gocron.go b/gocron.go index eab8386..ff1c117 100644 --- a/gocron.go +++ b/gocron.go @@ -105,29 +105,12 @@ func NewJob(interval uint64, options ...func(*Job)) *Job { func (j *Job) shouldRun() bool { j.mu.Lock() b := time.Now().After(j.nextRun) - - // pop the set key if it exists and b == true then the job will run - // popping removes the item from the Distributed Redis which means other machines running the same - // job will not run - if j.DistributedRedisClient != nil { - res := j.DistributedRedisClient.SPop(redisKey + j.DistributedJobName) - if err := res.Err(); err != nil || err == redis.Nil { - // the job would have ran, so reset it to the next item in the schedule - if b { - _ = j.scheduleNextRun(false) - } - return false - } - } - j.mu.Unlock() - if b { + if j.DistributedRedisClient != nil && b { go func() { time.Sleep(time.Duration(j.interval*8) * time.Second) - j.mu.Lock() j.DistributedRedisClient.SAdd(redisKey+j.DistributedJobName, "added") - j.mu.Unlock() }() } @@ -303,16 +286,6 @@ func (j *Job) scheduleNextRun(running bool) error { j.mu.Lock() j.nextRun = j.nextRun.Add(period) j.mu.Unlock() - - if running { - // job is being rescheduled so send up another key - if j.DistributedRedisClient != nil { - if err := j.DistributedRedisClient.SAdd(redisKey + j.DistributedJobName).Err(); err != nil { - return err - } - } - } - } return nil @@ -525,9 +498,7 @@ func (s *Scheduler) RunPending() error { runnableJobs, n := s.getRunnableJobs() for i := 0; i < n; i++ { - // pop the set key if it exists and b == true then the job will run - // popping removes the item from the Distributed Redis which means other machines running the same - // job will not run + // remove the item from the set, if something was removed then it was queued if runnableJobs[i].DistributedRedisClient != nil { res := runnableJobs[i].DistributedRedisClient.SRem(redisKey+runnableJobs[i].DistributedJobName, "added") if res.Val() == 0 { diff --git a/gocron_test.go b/gocron_test.go index cc581e4..cd0dc23 100644 --- a/gocron_test.go +++ b/gocron_test.go @@ -341,10 +341,14 @@ func (f *foo) getN() int64 { return atomic.LoadInt64(&f.jobNumber) } -const expectedNumber int64 = 10 +const ( + expectedNumber int64 = 10 + expectedNumberMinute int64 = 5 +) var ( testF *foo + testF2 *foo client *redis.Client ) @@ -359,6 +363,7 @@ func init() { Addr: s.Addr(), }) testF = new(foo) + testF2 = new(foo) } func TestBasicDistributedJob1(t *testing.T) { @@ -491,3 +496,149 @@ loop: t.Errorf("5 expected number of jobs %d, got %d", expectedNumber, testF.getN()) } } + +func TestBasicDistributedJobMinute1(t *testing.T) { + if t.Skipped() { + return + } + + t.Parallel() + var defaultOption = func(j *Job) { + j.DistributedJobName = "counter" + j.DistributedRedisClient = client + } + + sc := NewScheduler() + sc.Every(1, defaultOption).Minute().Do(testF2.incr) + +loop: + for { + select { + case <-sc.Start(): + case <-time.After(60 * time.Second): + sc.Clear() + break loop + } + } + + if (expectedNumberMinute-1 != testF2.getN()) && (expectedNumberMinute != testF2.getN()) && (expectedNumberMinute+1 != testF2.getN()) { + t.Errorf("1 expected number of jobs %d, got %d", expectedNumberMinute, testF2.getN()) + } + +} + +func TestBasicDistributedJobMinute2(t *testing.T) { + if t.Skipped() { + return + } + + t.Parallel() + var defaultOption = func(j *Job) { + j.DistributedJobName = "counter" + j.DistributedRedisClient = client + } + + sc := NewScheduler() + sc.Every(1, defaultOption).Minute().Do(testF2.incr) + +loop: + for { + select { + case <-sc.Start(): + case <-time.After(60 * time.Second): + sc.Clear() + break loop + } + } + + if (expectedNumberMinute-1 != testF2.getN()) && (expectedNumberMinute != testF2.getN()) && (expectedNumberMinute+1 != testF2.getN()) { + t.Errorf("1 expected number of jobs %d, got %d", expectedNumberMinute, testF2.getN()) + } +} + +func TestBasicDistributedJobMinute3(t *testing.T) { + if t.Skipped() { + return + } + + t.Parallel() + var defaultOption = func(j *Job) { + j.DistributedJobName = "counter" + j.DistributedRedisClient = client + } + + sc := NewScheduler() + sc.Every(1, defaultOption).Minute().Do(testF2.incr) + +loop: + for { + select { + case <-sc.Start(): + case <-time.After(60 * time.Second): + sc.Clear() + break loop + } + } + + if (expectedNumberMinute-1 != testF2.getN()) && (expectedNumberMinute != testF2.getN()) && (expectedNumberMinute+1 != testF2.getN()) { + t.Errorf("1 expected number of jobs %d, got %d", expectedNumberMinute, testF2.getN()) + } +} + +func TestBasicDistributedJobMinute4(t *testing.T) { + if t.Skipped() { + return + } + + t.Parallel() + var defaultOption = func(j *Job) { + j.DistributedJobName = "counter" + j.DistributedRedisClient = client + } + + sc := NewScheduler() + sc.Every(1, defaultOption).Minute().Do(testF2.incr) + +loop: + for { + select { + case <-sc.Start(): + case <-time.After(60 * time.Second): + sc.Clear() + break loop + } + } + + if (expectedNumberMinute-1 != testF2.getN()) && (expectedNumberMinute != testF2.getN()) && (expectedNumberMinute+1 != testF2.getN()) { + t.Errorf("1 expected number of jobs %d, got %d", expectedNumberMinute, testF2.getN()) + } +} + +func TestBasicDistributedJobMinute5(t *testing.T) { + if t.Skipped() { + return + } + + t.Parallel() + var defaultOption = func(j *Job) { + j.DistributedJobName = "counter" + j.DistributedRedisClient = client + } + + sc := NewScheduler() + sc.Every(1, defaultOption).Minute().Do(testF2.incr) + +loop: + for { + select { + case <-sc.Start(): + case <-time.After(60 * time.Second): + sc.Clear() + break loop + } + } + + if (expectedNumberMinute-1 != testF2.getN()) && (expectedNumberMinute != testF2.getN()) && (expectedNumberMinute+1 != testF2.getN()) { + t.Errorf("1 expected number of jobs %d, got %d", expectedNumberMinute, testF2.getN()) + } +}