-
Notifications
You must be signed in to change notification settings - Fork 7
/
queue.go
82 lines (72 loc) · 1.81 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package alimns
import (
"context"
"runtime"
"github.com/xiaojiaoyu100/curlew"
)
// M 消息内容,去掉其它字段是为了不要依赖消息其它字段,应该依赖数据库字段
type M struct {
QueueName string // 队列名
MessageID string // 消息ID, 建议不要依赖这个ID,可以找业务上的依赖ID
MessageBody string // 消息体
EnqueueTime int64 // 入队时间
ReceiptHandle string // 消息句柄
codec Codec
}
// Decode 解析消息
func (m *M) Decode(v interface{}) error {
return m.codec.Decode([]byte(m.MessageBody), &v)
}
// Handle 消息处理函数模板
type Handle func(ctx context.Context) error
// Queue 消息队列
type Queue struct {
Name string
Parallel int
AttributeSetters []QueueAttributeSetter
Builder
PullWait bool // 等消息消費完再去拉取消息
codec Codec
before Before
after After
isScheduled bool
receiveMessageChan chan *ReceiveMessage
longPollQuit chan struct{}
consumeQuit chan struct{}
dispatcher *curlew.Dispatcher
popCount int32
}
// Stop 使消息队列拉取消息和消费消息停止
func (q *Queue) Stop() {
if q.isScheduled {
q.isScheduled = false
close(q.longPollQuit)
close(q.consumeQuit)
}
}
func (q *Queue) safeParallel() int {
if q.Parallel > maxReceiveMessage {
return maxReceiveMessage
}
if q.Parallel >= 1 && q.Parallel <= maxReceiveMessage {
return q.Parallel
}
n := runtime.NumCPU() * 2
switch {
case n > maxReceiveMessage:
n = maxReceiveMessage
case n <= 0:
n = 1
}
return n
}
func (q *Queue) busy() bool {
return q.safeParallel() <= int(q.popCount)
}
func (q *Queue) safePullNumOfMessages() int {
r := q.safeParallel() - int(q.popCount)
if r < 1 {
return 1
}
return r
}