Skip to content

Commit 2686604

Browse files
committed
=gomod
1 parent ac650b1 commit 2686604

File tree

3 files changed

+23
-0
lines changed

3 files changed

+23
-0
lines changed

go.mod

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module github.com/liangdas/armyant
2+
3+
go 1.13
4+
5+
require (
6+
github.com/eclipse/paho.mqtt.golang v1.2.0
7+
golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933
8+
)

go.sum

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
2+
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
3+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
4+
golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 h1:e6HwijUxhDe+hPNjZQQn9bA5PW3vNmnN64U2ZW759Lk=
5+
golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
6+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
7+
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
8+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

work/mqttwork.go

+7
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@ import (
1717
"fmt"
1818
MQTT "github.com/eclipse/paho.mqtt.golang"
1919
"strings"
20+
"sync"
2021
_ "time"
2122
"net/url"
2223
)
2324

2425
type MqttWork struct {
2526
client MQTT.Client
2627
waiting_queue map[string]func(client MQTT.Client, msg MQTT.Message)
28+
lock *sync.Mutex
2729
curr_id int64
2830
}
2931

3032
func (this *MqttWork) GetDefaultOptions(addrURI string) *MQTT.ClientOptions {
3133
this.curr_id = 0
34+
this.lock=new(sync.Mutex)
3235
this.waiting_queue = make(map[string]func(client MQTT.Client, msg MQTT.Message))
3336
opts := MQTT.NewClientOptions()
3437
opts.AddBroker(addrURI)
@@ -40,6 +43,7 @@ func (this *MqttWork) GetDefaultOptions(addrURI string) *MQTT.ClientOptions {
4043
opts.SetAutoReconnect(false)
4144
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
4245
//收到消息
46+
this.lock.Lock()
4347
if callback, ok := this.waiting_queue[msg.Topic()]; ok {
4448
//有等待消息的callback 还缺一个信息超时的处理机制
4549
_, err := url.Parse(msg.Topic())
@@ -52,6 +56,7 @@ func (this *MqttWork) GetDefaultOptions(addrURI string) *MQTT.ClientOptions {
5256
}
5357
go callback(client, msg)
5458
}
59+
this.lock.Unlock()
5560
})
5661
return opts
5762
}
@@ -147,5 +152,7 @@ func (this *MqttWork) RequestNR(topic string, body []byte) {
147152
*/
148153
func (this *MqttWork) On(topic string, callback func(client MQTT.Client, msg MQTT.Message)) {
149154
////服务器不会返回结果
155+
this.lock.Lock()
150156
this.waiting_queue[topic] = callback //添加这条消息到等待队列
157+
this.lock.Unlock()
151158
}

0 commit comments

Comments
 (0)