-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob_manager.go
112 lines (92 loc) · 2.7 KB
/
job_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package gojm
import (
"context"
"sync"
"time"
priorityqueue "github.com/xybor-x/priority_queue"
)
// JobManager allows to schedule job based on its priority.
type JobManager struct {
hook func(ctx context.Context, job JobWrapper)
queue *priorityqueue.PriorityQueue[*Job]
}
// New initialized a JobManager.
func New() *JobManager {
return &JobManager{
hook: nil,
queue: priorityqueue.Default[*Job](),
}
}
// AddPriority sets a new Priority to JobManager.
func (m *JobManager) AddPriority(p Priority) {
if err := m.queue.SetPriority(p, p.value); err != nil {
panic(err)
}
if p.agingTimeSlice != nil {
if err := m.queue.SetAgingTimeSlice(p, *p.agingTimeSlice); err != nil {
panic(err)
}
}
}
// SetDefaultJobAging sets a timeslice. When the job has existed for more than
// this timeslice, it will be moved to the higher priority. This timeslice is
// only applied when the priority hasn't its own aging.
func (m *JobManager) SetDefaultJobAging(timeslice time.Duration) {
if err := m.queue.SetDefaultAgingTimeSlice(timeslice); err != nil {
panic(err)
}
}
// RefreshEvery sets an interval which refreshes the priority of jobs every time
// interval passed. If you do not call this method, this interval will be chosen
// automatically (equal to the least aging timeslice of all priorities).
func (m *JobManager) RefreshEvery(interval time.Duration) {
m.queue.SetAgingInterval(interval)
}
// Schedule adds a Job to JobManager, the job will be scheduled to execute
// later.
func (m *JobManager) Schedule(priority Priority, job *Job) error {
return m.queue.Enqueue(priority, job)
}
// Run starts the JobManager. The parameter numThreads specifies the number of
// Jobs which could be executed concurrently.
func (m *JobManager) Run(ctx context.Context, numThreads int) error {
wg := sync.WaitGroup{}
wg.Add(numThreads)
var finalErr error
var mutex sync.Mutex
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i := 0; i < numThreads; i++ {
go func() {
defer wg.Done()
if err := m.RunOne(ctx); err != nil {
mutex.Lock()
if finalErr == nil {
finalErr = err
cancel()
}
mutex.Unlock()
}
}()
}
wg.Wait()
return finalErr
}
// RunOne starts the JobManager which only one Job could be executed at a time.
func (m *JobManager) RunOne(ctx context.Context) error {
for {
job, err := m.queue.WaitDequeue(ctx)
if err != nil {
return err
}
job.To().Exec(ctx)
if m.hook != nil {
m.hook(ctx, wrapJob(job))
}
}
}
// Hook sets a trigger function to be executed when the job has just completed.
// This method should be called before calling Run() or RunOne().
func (m *JobManager) Hook(trigger func(ctx context.Context, job JobWrapper)) {
m.hook = trigger
}