-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathbroker.go
87 lines (73 loc) · 1.64 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package rabbitmq
import (
"github.com/gojuukaze/YTask/v3/brokers"
"github.com/gojuukaze/YTask/v3/message"
"github.com/gojuukaze/YTask/v3/util/yjson"
"github.com/gojuukaze/YTask/v3/yerrors"
)
type Broker struct {
client *Client
host string
port string
user string
password string
vhost string
poolSize int
}
func NewRabbitMqBroker(host, port, user, password, vhost string, poolSize int) Broker {
return Broker{
host: host,
port: port,
password: password,
user: user,
vhost: vhost,
poolSize: poolSize,
}
}
func (r *Broker) Activate() {
r.client = NewRabbitMqClient(r.host, r.port, r.user, r.password, r.vhost, r.poolSize)
}
func (r *Broker) SetPoolSize(n int) {
r.poolSize = n
}
func (r *Broker) GetPoolSize() int {
return r.poolSize
}
func (r *Broker) Next(queueName string) (message.Message, error) {
var msg message.Message
b, err := r.client.Get(queueName)
if err != nil {
if err == AMQPNil {
err = yerrors.ErrEmptyQueue{}
}
return msg, err
}
err = yjson.YJson.Unmarshal(b, &msg)
return msg, err
}
func (r *Broker) Send(queueName string, msg message.Message) error {
b, err := yjson.YJson.Marshal(msg)
if err != nil {
return err
}
err = r.client.Publish(queueName, b, 0)
return err
}
func (r *Broker) LSend(queueName string, msg message.Message) error {
b, err := yjson.YJson.Marshal(msg)
if err != nil {
return err
}
err = r.client.Publish(queueName, b, 5)
return err
}
func (r Broker) Clone() brokers.BrokerInterface {
return &Broker{
host: r.host,
port: r.port,
password: r.password,
user: r.user,
vhost: r.vhost,
poolSize: r.poolSize,
}
}