-
Notifications
You must be signed in to change notification settings - Fork 0
/
cron.go
81 lines (63 loc) · 1.1 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package cron
import (
"context"
"fmt"
"time"
)
type (
Job func(context.Context) bool
TermCh <-chan struct{}
)
func Start(ctx context.Context, job Job, opt ...Option) (TermCh, error) {
s := &settings{}
for _, o := range opt {
o.apply(s)
}
if s.interval == 0 {
return nil, fmt.Errorf("invalid interval: it can't be zero")
}
term := make(chan struct{})
go func() {
defer close(term)
wait(ctx, s.lead)
repeat(ctx, s.interval, job)
}()
return term, nil
}
func wait(ctx context.Context, period time.Duration) {
if period == 0 {
return
}
timer := time.NewTimer(period)
select {
case <-ctx.Done():
stopAndDrain(timer)
case <-timer.C:
}
}
func repeat(ctx context.Context, internval time.Duration, job Job) {
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
ticker := time.NewTicker(internval)
for {
select {
case <-jobCtx.Done():
return
case <-ticker.C:
go func() {
if cont := job(jobCtx); !cont {
cancel()
}
}()
}
}
}
func stopAndDrain(t *time.Timer) {
fmt.Println("stop and drain")
if t == nil {
return
}
if !t.Stop() {
<-t.C
}
}