forked from freehere107/go-workers
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.go
79 lines (66 loc) · 1.62 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package workers
import (
"sync/atomic"
"time"
)
type worker struct {
manager *manager
stop chan bool
exit chan bool
currentMsg *Msg
startedAt int64
}
func (w *worker) start() {
go w.work(w.manager.fetch.Messages())
}
func (w *worker) quit() {
w.stop <- true
<-w.exit
}
func (w *worker) work(messages chan *Msg) {
for {
select {
case message := <-messages:
atomic.StoreInt64(&w.startedAt, time.Now().UTC().Unix())
w.currentMsg = message
TaskDequeueMetric.WithLabelValues(w.manager.queueName()).Inc()
if w.process(message) {
w.manager.confirm <- message
TaskProcessedMetric.WithLabelValues(w.manager.queueName(), "0").Inc()
} else {
TaskProcessedMetric.WithLabelValues(w.manager.queueName(), "1").Inc()
}
atomic.StoreInt64(&w.startedAt, 0)
w.currentMsg = nil
// Attempt to tell fetcher we're finished.
// Can be used when the fetcher has slept due
// to detecting an empty queue to requery the
// queue immediately if we finish work.
select {
case w.manager.fetch.FinishedWork() <- true:
default:
}
case w.manager.fetch.Ready() <- true:
// Signaled to fetcher that we're
// ready to accept a message
case <-w.stop:
w.exit <- true
return
}
}
}
func (w *worker) process(message *Msg) (acknowledge bool) {
acknowledge = true
defer func() {
recover()
}()
return w.manager.mids.call(w.manager.queueName(), message, func() {
w.manager.job(message)
})
}
func (w *worker) processing() bool {
return atomic.LoadInt64(&w.startedAt) > 0
}
func newWorker(m *manager) *worker {
return &worker{m, make(chan bool), make(chan bool), nil, 0}
}