Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): enable job to support context.Context #6

Merged
merged 1 commit into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions chain.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cron

import (
"context"
"fmt"
"runtime"
"sync"
Expand Down Expand Up @@ -40,7 +41,7 @@ func (c Chain) Then(j Job) Job {
// Recover panics in wrapped jobs and log them with the provided logger.
func Recover(logger Logger) JobWrapper {
return func(j Job) Job {
return FuncJob(func() {
return FuncJob(func(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
Expand All @@ -53,7 +54,7 @@ func Recover(logger Logger) JobWrapper {
logger.Error(err, "panic", "stack", "...\n"+string(buf))
}
}()
j.Run()
j.Run(ctx)
})
}
}
Expand All @@ -64,14 +65,14 @@ func Recover(logger Logger) JobWrapper {
func DelayIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return FuncJob(func() {
return FuncJob(func(ctx context.Context) {
start := time.Now()
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
}
j.Run()
j.Run(ctx)
})
}
}
Expand All @@ -82,11 +83,11 @@ func SkipIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
ch := make(chan struct{}, 1)
ch <- struct{}{}
return FuncJob(func() {
return FuncJob(func(ctx context.Context) {
select {
case v := <-ch:
defer func() { ch <- v }()
j.Run()
j.Run(ctx)
default:
logger.Info("skip")
}
Expand Down
47 changes: 24 additions & 23 deletions chain_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cron

import (
"context"
"io/ioutil" //nolint:staticcheck // todo: Waiting for refactoring
"log"
"reflect"
Expand All @@ -11,7 +12,7 @@ import (

func appendingJob(slice *[]int, value int) Job {
var m sync.Mutex
return FuncJob(func() {
return FuncJob(func(context.Context) {
m.Lock()
*slice = append(*slice, value)
m.Unlock()
Expand All @@ -20,9 +21,9 @@ func appendingJob(slice *[]int, value int) Job {

func appendingWrapper(slice *[]int, value int) JobWrapper {
return func(j Job) Job {
return FuncJob(func() {
appendingJob(slice, value).Run()
j.Run()
return FuncJob(func(ctx context.Context) {
appendingJob(slice, value).Run(ctx)
j.Run(ctx)
})
}
}
Expand All @@ -35,14 +36,14 @@ func TestChain(t *testing.T) {
append3 = appendingWrapper(&nums, 3)
append4 = appendingJob(&nums, 4)
)
NewChain(append1, append2, append3).Then(append4).Run()
NewChain(append1, append2, append3).Then(append4).Run(context.Background())
if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
t.Error("unexpected order of calls:", nums)
}
}

func TestChainRecover(t *testing.T) {
panickingJob := FuncJob(func() {
panickingJob := FuncJob(func(context.Context) {
panic("panickingJob panics")
})

Expand All @@ -53,19 +54,19 @@ func TestChainRecover(t *testing.T) {
}
}()
NewChain().Then(panickingJob).
Run()
Run(context.Background())
})

t.Run("Recovering JobWrapper recovers", func(*testing.T) {
NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
Then(panickingJob).
Run()
Run(context.Background())
})

t.Run("composed with the *IfStillRunning wrappers", func(*testing.T) {
NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
Then(panickingJob).
Run()
Run(context.Background())
})
}

Expand All @@ -76,7 +77,7 @@ type countJob struct {
delay time.Duration
}

func (j *countJob) Run() {
func (j *countJob) Run(context.Context) {
j.m.Lock()
j.started++
j.m.Unlock()
Expand All @@ -102,7 +103,7 @@ func TestChainDelayIfStillRunning(t *testing.T) {
t.Run("runs immediately", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
if c := j.Done(); c != 1 {
t.Errorf("expected job run once, immediately, got %d", c)
Expand All @@ -113,9 +114,9 @@ func TestChainDelayIfStillRunning(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
time.Sleep(time.Millisecond)
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
if c := j.Done(); c != 2 {
Expand All @@ -128,9 +129,9 @@ func TestChainDelayIfStillRunning(t *testing.T) {
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
time.Sleep(time.Millisecond)
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
}()

// After 5ms, the first job is still in progress, and the second job was
Expand All @@ -154,7 +155,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("runs immediately", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
if c := j.Done(); c != 1 {
t.Errorf("expected job run once, immediately, got %d", c)
Expand All @@ -165,9 +166,9 @@ func TestChainSkipIfStillRunning(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
time.Sleep(time.Millisecond)
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
if c := j.Done(); c != 2 {
Expand All @@ -180,9 +181,9 @@ func TestChainSkipIfStillRunning(t *testing.T) {
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
time.Sleep(time.Millisecond)
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
}()

// After 5ms, the first job is still in progress, and the second job was
Expand All @@ -206,7 +207,7 @@ func TestChainSkipIfStillRunning(t *testing.T) {
j.delay = 10 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
for i := 0; i < 11; i++ {
go wrappedJob.Run()
go wrappedJob.Run(context.Background())
}
time.Sleep(200 * time.Millisecond)
done := j.Done()
Expand All @@ -223,8 +224,8 @@ func TestChainSkipIfStillRunning(t *testing.T) {
wrappedJob1 := chain.Then(&j1)
wrappedJob2 := chain.Then(&j2)
for i := 0; i < 11; i++ {
go wrappedJob1.Run()
go wrappedJob2.Run()
go wrappedJob1.Run(context.Background())
go wrappedJob2.Run(context.Background())
}
time.Sleep(100 * time.Millisecond)
var (
Expand Down
10 changes: 5 additions & 5 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ScheduleParser interface {

// Job is an interface for submitted cron jobs.
type Job interface {
Run()
Run(ctx context.Context)
}

// Schedule describes a job's duty cycle.
Expand Down Expand Up @@ -131,14 +131,14 @@ func New(opts ...Option) *Cron {
}

// FuncJob is a wrapper that turns a func() into a cron.Job
type FuncJob func()
type FuncJob func(ctx context.Context)

func (f FuncJob) Run() { f() }
func (f FuncJob) Run(ctx context.Context) { f(ctx) }

// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
func (c *Cron) AddFunc(spec string, cmd func(ctx context.Context)) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}

Expand Down Expand Up @@ -309,7 +309,7 @@ func (c *Cron) startJob(j Job) {
c.jobWaiter.Add(1)
go func() {
defer c.jobWaiter.Done()
j.Run()
j.Run(context.Background()) // todo: pass context from cron
}()
}

Expand Down
Loading
Loading