Skip to content

Commit

Permalink
Gocron: Adding basic functionality for a distributed system
Browse files Browse the repository at this point in the history
The use case is if this scheduler piggybacks on a load balanced web app
you may not want duplicate jobs running as a result, it's not perfect
because there is a chance that more than 1 machine will have a job in a
ready state, but does it's best to keep it simple to throttle

Could be better by integrating machinery or just using machinery in general
  • Loading branch information
marcsantiago committed Jul 20, 2019
1 parent e7daca5 commit 2e1f1ba
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 8 deletions.
57 changes: 49 additions & 8 deletions gocron.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"strings"
"sync"
"time"

// perferencing a really stable redis client over exposing a redis interface
"github.com/go-redis/redis"
)

// globals
Expand All @@ -49,14 +52,20 @@ const (
weeks = "weeks"
)

const (
redisKey = "gocron:distributed:job:"
)

// ChangeLoc change default the time location
func ChangeLoc(newLocation *time.Location) {
loc = newLocation
}

// Job struct keeping information about job
type Job struct {
ShouldDoImmediately bool // indicates that jobs should start before scheduling
ShouldDoImmediately bool // indicates that jobs should start before scheduling
DistributedRedisClient *redis.Client // if the client is passed in the scheduler will check the redis client before running a job to corridinate with a distributed system
DistributedJobName string // name assigned to the distributed job, if empty and more than one job is running a redis name collusion will occur

mu *sync.Mutex
interval uint64 // pause interval * unit bettween runs
Expand All @@ -72,8 +81,8 @@ type Job struct {
}

// NewJob creates a new job with the time interval.
func NewJob(interval uint64) *Job {
return &Job{
func NewJob(interval uint64, options ...func(*Job)) *Job {
j := &Job{
mu: new(sync.Mutex),
interval: interval,
jobFunc: "",
Expand All @@ -85,13 +94,28 @@ func NewJob(interval uint64) *Job {
funcs: make(map[string]interface{}),
fparams: make(map[string]([]interface{})),
}

for _, option := range options {
option(j)
}
return j
}

// True if the job should be run now
func (j *Job) shouldRun() bool {
j.mu.Lock()
b := time.Now().After(j.nextRun)
j.mu.Unlock()

if b {
go func() {
time.Sleep(time.Duration(j.interval*8) * time.Second)
j.mu.Lock()
j.DistributedRedisClient.SAdd(redisKey+j.DistributedJobName, "added")
j.mu.Unlock()
}()
}

return b
}

Expand All @@ -113,7 +137,9 @@ func (j *Job) run() ([]reflect.Value, error) {
}
in[k] = reflect.ValueOf(param)
}
j.mu.Lock()
result = f.Call(in)
j.mu.Unlock()
}

j.mu.Lock()
Expand Down Expand Up @@ -156,6 +182,7 @@ func (j *Job) Do(jobFun interface{}, params ...interface{}) error {
j.jobFunc = fname
j.mu.Unlock()

// queue the next job, if redis is include add to set
j.scheduleNextRun()
return nil
}
Expand Down Expand Up @@ -231,13 +258,11 @@ func (j *Job) scheduleNextRun() error {

switch j.unit {
case days:
j.ShouldDoImmediately = true
j.mu.Lock()
j.nextRun = j.roundToMidnight(j.lastRun)
j.nextRun = j.nextRun.Add(j.atTime)
j.mu.Unlock()
case weeks:
j.ShouldDoImmediately = true
j.mu.Lock()
j.nextRun = j.roundToMidnight(j.lastRun)
dayDiff := int(j.startDay)
Expand All @@ -258,13 +283,14 @@ func (j *Job) scheduleNextRun() error {
return err
}

j.ShouldDoImmediately = true
// advance to next possible schedule
for j.nextRun.Before(now) || j.nextRun.Before(j.lastRun) {
j.ShouldDoImmediately = true
j.mu.Lock()
j.nextRun = j.nextRun.Add(period)
j.mu.Unlock()
}

return nil
}

Expand All @@ -279,7 +305,7 @@ func (j *Job) NextScheduledTime() time.Time {
// the follow functions set the job's unit with seconds,minutes,hours...
func (j *Job) mustInterval(i uint64) error {
if j.interval != i {
return fmt.Errorf("interval maust be %d", i)
return fmt.Errorf("interval must be %d", i)
}
return nil
}
Expand Down Expand Up @@ -460,7 +486,12 @@ func (s *Scheduler) Err() error {

// RunPending runs all the jobs that are scheduled to run.
func (s *Scheduler) RunPending() error {
if s.shouldClear {
var shouldClear bool
s.mu.Lock()
shouldClear = s.shouldClear
s.mu.Unlock()

if shouldClear {
s.mu.Lock()
s.jobs = []*Job{}
s.shouldClear = false
Expand All @@ -470,6 +501,16 @@ func (s *Scheduler) RunPending() error {

runnableJobs, n := s.getRunnableJobs()
for i := 0; i < n; i++ {
// pop the set key if it exists and b == true then the job will run
// popping removes the item from the Distributed Redis which means other machines running the same
// job will not run
if runnableJobs[i].DistributedRedisClient != nil {
res := runnableJobs[i].DistributedRedisClient.SRem(redisKey+runnableJobs[i].DistributedJobName, "added")
if res.Val() == 0 {
continue
}
}

s.mu.Lock()
_, err := runnableJobs[i].run()
s.mu.Unlock()
Expand Down
168 changes: 168 additions & 0 deletions gocron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package gocron

import (
"fmt"
"log"
"sync/atomic"
"testing"
"time"

"github.com/alicebob/miniredis"
"github.com/go-redis/redis"
)

var defaultOption = func(j *Job) {
Expand Down Expand Up @@ -323,3 +328,166 @@ func TestWeekdayAt(t *testing.T) {
exp = time.Date(now.Year(), now.Month(), now.Day()+1, hour, minute, 0, 0, loc)
assertEqualTime("next run", t, weekJob.nextRun, exp)
}

type foo struct {
jobNumber int64
}

func (f *foo) incr() {
atomic.AddInt64(&f.jobNumber, 1)
}

func (f *foo) getN() int64 {
return atomic.LoadInt64(&f.jobNumber)
}

const expectedNumber int64 = 10

var (
testF *foo
client *redis.Client
)

func init() {
s, err := miniredis.Run()
if err != nil {
log.Fatal(err)
}
// defer s.Close()

client = redis.NewClient(&redis.Options{
Addr: s.Addr(),
})
testF = new(foo)
}

func TestBasicDistributedJob1(t *testing.T) {
t.Parallel()

var defaultOption = func(j *Job) {
j.DistributedJobName = "counter"
j.DistributedRedisClient = client
}

sc := NewScheduler()
sc.Every(1, defaultOption).Second().Do(testF.incr)

loop:
for {
select {
case <-sc.Start():
case <-time.After(10 * time.Second):
sc.Clear()
break loop
}
}

if (expectedNumber-1 != testF.getN()) && (expectedNumber != testF.getN()) && (expectedNumber+1 != testF.getN()) {
t.Errorf("1 expected number of jobs %d, got %d", expectedNumber, testF.getN())
}

}

func TestBasicDistributedJob2(t *testing.T) {
t.Parallel()

var defaultOption = func(j *Job) {
j.DistributedJobName = "counter"
j.DistributedRedisClient = client
}

sc := NewScheduler()
sc.Every(1, defaultOption).Second().Do(testF.incr)

loop:
for {
select {
case <-sc.Start():
case <-time.After(10 * time.Second):
sc.Clear()
break loop
}
}

if (expectedNumber-1 != testF.getN()) && (expectedNumber != testF.getN()) && (expectedNumber+1 != testF.getN()) {
t.Errorf("2 expected number of jobs %d, got %d", expectedNumber, testF.getN())
}
}

func TestBasicDistributedJob3(t *testing.T) {
t.Parallel()

var defaultOption = func(j *Job) {
j.DistributedJobName = "counter"
j.DistributedRedisClient = client
}

sc := NewScheduler()
sc.Every(1, defaultOption).Second().Do(testF.incr)

loop:
for {
select {
case <-sc.Start():
case <-time.After(10 * time.Second):
sc.Clear()
break loop
}
}

if (expectedNumber-1 != testF.getN()) && (expectedNumber != testF.getN()) && (expectedNumber+1 != testF.getN()) {
t.Errorf("3 expected number of jobs %d, got %d", expectedNumber, testF.getN())
}
}

func TestBasicDistributedJob4(t *testing.T) {
t.Parallel()

var defaultOption = func(j *Job) {
j.DistributedJobName = "counter"
j.DistributedRedisClient = client
}

sc := NewScheduler()
sc.Every(1, defaultOption).Second().Do(testF.incr)

loop:
for {
select {
case <-sc.Start():
case <-time.After(10 * time.Second):
sc.Clear()
break loop
}
}

if (expectedNumber-1 != testF.getN()) && (expectedNumber != testF.getN()) && (expectedNumber+1 != testF.getN()) {
t.Errorf("4 expected number of jobs %d, got %d", expectedNumber, testF.getN())
}
}

func TestBasicDistributedJob5(t *testing.T) {
t.Parallel()

var defaultOption = func(j *Job) {
j.DistributedJobName = "counter"
j.DistributedRedisClient = client
}

sc := NewScheduler()
sc.Every(1, defaultOption).Second().Do(testF.incr)

loop:
for {
select {
case <-sc.Start():
case <-time.After(10 * time.Second):
sc.Clear()
break loop
}
}

if (expectedNumber-1 != testF.getN()) && (expectedNumber != testF.getN()) && (expectedNumber+1 != testF.getN()) {
t.Errorf("5 expected number of jobs %d, got %d", expectedNumber, testF.getN())
}
}

0 comments on commit 2e1f1ba

Please sign in to comment.