Skip to content

Commit

Permalink
增加日志
Browse files Browse the repository at this point in the history
  • Loading branch information
ctlove0523 committed Dec 20, 2020
1 parent e521dee commit 71e032d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 68 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.15

require (
github.com/eclipse/paho.mqtt.golang v1.3.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/satori/go.uuid v1.2.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I=
github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
Expand Down
146 changes: 78 additions & 68 deletions iotdevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package iot

import (
"encoding/json"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/golang/glog"
"github.com/satori/go.uuid"
"strings"
"time"
)

type Device interface {
Expand Down Expand Up @@ -38,7 +39,7 @@ func (device *iotDevice) createMessageMqttHandler() func(client mqtt.Client, mes
messageHandler := func(client mqtt.Client, message mqtt.Message) {
msg := &Message{}
if json.Unmarshal(message.Payload(), msg) != nil {
fmt.Println("unmarshal device message failed")
glog.Warningf("unmarshal device message failed,device id = %s,message = %s", device.Id, message)
}

for _, handler := range device.messageHandlers {
Expand All @@ -53,7 +54,7 @@ func (device *iotDevice) createCommandMqttHandler() func(client mqtt.Client, mes
commandHandler := func(client mqtt.Client, message mqtt.Message) {
command := &Command{}
if json.Unmarshal(message.Payload(), command) != nil {
fmt.Println("unmarshal failed")
glog.Warningf("unmarshal platform command failed,device id = %s,message = %s", device.Id, message)
}

handleFlag := true
Expand All @@ -62,13 +63,15 @@ func (device *iotDevice) createCommandMqttHandler() func(client mqtt.Client, mes
}
var res string
if handleFlag {
glog.Infof("device %s handle command success", device.Id)
res = Interface2JsonString(SuccessIotCommandResponse())
} else {
glog.Warningf("device %s handle command failed", device.Id)
res = Interface2JsonString(FailedIotCommandResponse())
}
if token := device.client.Publish(device.topics[CommandResponseTopicName]+GetTopicRequestId(message.Topic()), 1, false, res);
token.Wait() && token.Error() != nil {
fmt.Println("send command response failed")
glog.Infof("device %s send command response failed", device.Id)
}
}

Expand All @@ -79,7 +82,7 @@ func (device *iotDevice) createPropertiesSetMqttHandler() func(client mqtt.Clien
propertiesSetHandler := func(client mqtt.Client, message mqtt.Message) {
propertiesSetRequest := &DevicePropertyDownRequest{}
if json.Unmarshal(message.Payload(), propertiesSetRequest) != nil {
fmt.Println("unmarshal failed")
glog.Warningf("unmarshal platform properties set request failed,device id = %s,message = %s", device.Id, message)
}

handleFlag := true
Expand All @@ -95,7 +98,7 @@ func (device *iotDevice) createPropertiesSetMqttHandler() func(client mqtt.Clien
}
if token := device.client.Publish(device.topics[PropertiesSetResponseTopicName]+GetTopicRequestId(message.Topic()), 1, false, res);
token.Wait() && token.Error() != nil {
fmt.Println("send properties set response failed")
glog.Warningf("unmarshal platform properties set request failed,device id = %s,message = %s", device.Id, message)
}
}

Expand All @@ -106,14 +109,14 @@ func (device *iotDevice) createPropertiesQueryMqttHandler() func(client mqtt.Cli
propertiesQueryHandler := func(client mqtt.Client, message mqtt.Message) {
propertiesQueryRequest := &DevicePropertyQueryRequest{}
if json.Unmarshal(message.Payload(), propertiesQueryRequest) != nil {
fmt.Println("unmarshal failed")
glog.Warningf("device %s unmarshal properties query request failed %s", device.Id, message)
}

queryResult := device.propertyQueryHandler(*propertiesQueryRequest)
responseToPlatform := Interface2JsonString(queryResult)
if token := device.client.Publish(device.topics[PropertiesQueryResponseTopicName]+GetTopicRequestId(message.Topic()), 1, false, responseToPlatform);
token.Wait() && token.Error() != nil {
fmt.Println("send properties set response failed")
glog.Warningf("device %s send properties query response failed.", device.Id)
}
}

Expand All @@ -124,7 +127,7 @@ func (device *iotDevice) createPropertiesQueryResponseMqttHandler() func(client
propertiesQueryResponseHandler := func(client mqtt.Client, message mqtt.Message) {
propertiesQueryResponse := &DevicePropertyQueryResponse{}
if json.Unmarshal(message.Payload(), propertiesQueryResponse) != nil {
fmt.Println("unmarshal failed")
glog.Warningf("device %s unmarshal property response failed,message %s", device.Id, Interface2JsonString(message))
}
device.propertiesQueryResponseHandler(*propertiesQueryResponse)
}
Expand All @@ -142,51 +145,15 @@ func (device *iotDevice) Init() bool {
device.client = mqtt.NewClient(options)

if token := device.client.Connect(); token.Wait() && token.Error() != nil {
fmt.Printf("IoT device init failed,caulse %s\n", token.Error())
glog.Warningf("device %s init failed", device.Id)
return false
}

device.subscribeDefaultTopics()

return true

}

func (device *iotDevice) subscribeDefaultTopics() {
// 订阅平台命令下发topic
if token := device.client.Subscribe(device.topics[CommandDownTopicName], 2, device.createCommandMqttHandler());
token.Wait() && token.Error() != nil {
fmt.Println("subscribe command down topic failed")
panic(0)
}

// 订阅平台消息下发的topic
if token := device.client.Subscribe(device.topics[MessageDownTopicName], 2, device.createMessageMqttHandler());
token.Wait() && token.Error() != nil {
fmt.Println("subscribe message down topic failed")
panic(0)
}

// 订阅平台设置设备属性的topic
if token := device.client.Subscribe(device.topics[PropertiesSetRequestTopicName], 2, device.createPropertiesSetMqttHandler());
token.Wait() && token.Error() != nil {
fmt.Println("subscribe properties set topic failed")
panic(0)
}

// 订阅平台查询设备属性的topic
if token := device.client.Subscribe(device.topics[PropertiesQueryRequestTopicName], 2, device.createPropertiesQueryMqttHandler());
token.Wait() && token.Error() != nil {
fmt.Println("subscribe properties query topic failed")
panic(0)
}
go logFlush()

// 订阅查询设备影子响应的topic
if token := device.client.Subscribe(device.topics[DeviceShadowQueryResponseTopicName], 2, device.createPropertiesQueryResponseMqttHandler());
token.Wait() && token.Error() != nil {
fmt.Println("subscribe query device shadow datafailed")
panic(0)
}
return true

}

Expand All @@ -201,7 +168,7 @@ func (device *iotDevice) SendMessage(message Message) bool {
messageData := Interface2JsonString(message)
if token := device.client.Publish(device.topics[MessageUpTopicName], 2, false, messageData);
token.Wait() && token.Error() != nil {
fmt.Println("send message failed")
glog.Warningf("device %s send messagefailed", device.Id)
return false
}

Expand All @@ -212,26 +179,25 @@ func (device *iotDevice) ReportProperties(properties ServiceProperty) bool {
propertiesData := Interface2JsonString(properties)
if token := device.client.Publish(device.topics[PropertiesUpTopicName], 2, false, propertiesData);
token.Wait() && token.Error() != nil {
fmt.Println("report properties failed")
glog.Warningf("device %s report properties failed", device.Id)
return false
}
return true
}

func (device *iotDevice) BatchReportSubDevicesProperties(service DevicesService) {
if token:=device.client.Publish(device.topics[GatewayBatchReportSubDeviceTopicName],2,false, Interface2JsonString(service));
if token := device.client.Publish(device.topics[GatewayBatchReportSubDeviceTopicName], 2, false, Interface2JsonString(service));
token.Wait() && token.Error() != nil {
fmt.Println("batch report sub device properties failed")
glog.Warningf("device %s batch report sub device properties failed", device.Id)
}
}

func (device *iotDevice) QueryDeviceShadow(query DevicePropertyQueryRequest, handler DevicePropertyQueryResponseHandler) {
device.propertiesQueryResponseHandler = handler
requestId := uuid.NewV4()
fmt.Println(requestId)
if token := device.client.Publish(device.topics[DeviceShadowQueryRequestTopicName]+requestId.String(), 2, false, Interface2JsonString(query));
token.Wait() && token.Error() != nil {
fmt.Println("query device shadow data failed")
glog.Warningf("device %s query device shadow data failed,request id = %s", device.Id, requestId)
}
}

Expand Down Expand Up @@ -261,20 +227,6 @@ func (device *iotDevice) SetPropertyQueryHandler(handler DevicePropertyQueryHand
device.propertyQueryHandler = handler
}

//func (device *iotDevice) SetPropertiesQueryResponseHandler(handler samples.IotDevicePropertyQueryResponseHandler) {
// device.propertiesQueryResponseHandler = handler
//}

func assembleClientId(device *iotDevice) string {
segments := make([]string, 4)
segments[0] = device.Id
segments[1] = "0"
segments[2] = "0"
segments[3] = TimeStamp()

return strings.Join(segments, "_")
}

func CreateIotDevice(id, password, servers string) Device {
device := &iotDevice{}
device.Id = id
Expand All @@ -299,3 +251,61 @@ func CreateIotDevice(id, password, servers string) Device {
device.topics[GatewayBatchReportSubDeviceTopicName] = FormatTopic(GatewayBatchReportSubDeviceTopic, id)
return device
}

func assembleClientId(device *iotDevice) string {
segments := make([]string, 4)
segments[0] = device.Id
segments[1] = "0"
segments[2] = "0"
segments[3] = TimeStamp()

return strings.Join(segments, "_")
}

func logFlush() {
ticker := time.Tick(5 * time.Second)
for{
select {
case <-ticker:
glog.Flush()
}
}
}

func (device *iotDevice) subscribeDefaultTopics() {
// 订阅平台命令下发topic
if token := device.client.Subscribe(device.topics[CommandDownTopicName], 2, device.createCommandMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscribe platform send command topic %s failed", device.Id, device.topics[CommandDownTopicName])
panic(0)
}

// 订阅平台消息下发的topic
if token := device.client.Subscribe(device.topics[MessageDownTopicName], 2, device.createMessageMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device % subscribe platform send message topic %s failed.", device.Id, device.topics[MessageDownTopicName])
panic(0)
}

// 订阅平台设置设备属性的topic
if token := device.client.Subscribe(device.topics[PropertiesSetRequestTopicName], 2, device.createPropertiesSetMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscribe platform set properties topic %s failed", device.Id, device.topics[PropertiesSetRequestTopicName])
panic(0)
}

// 订阅平台查询设备属性的topic
if token := device.client.Subscribe(device.topics[PropertiesQueryRequestTopicName], 2, device.createPropertiesQueryMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscriber platform query device properties topic failed %s", device.Id, device.topics[PropertiesQueryRequestTopicName])
panic(0)
}

// 订阅查询设备影子响应的topic
if token := device.client.Subscribe(device.topics[DeviceShadowQueryResponseTopicName], 2, device.createPropertiesQueryResponseMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscribe query device shadow topic %s failed", device.Id, device.topics[DeviceShadowQueryResponseTopicName])
panic(0)
}

}
17 changes: 17 additions & 0 deletions samples/loop_demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"fmt"
"time"
)

func main() {
ticker := time.Tick(5 * time.Second)
for ; ; {
select {
case <-ticker:
fmt.Println(time.Now().Second())

}
}
}

0 comments on commit 71e032d

Please sign in to comment.