Skip to content

Commit

Permalink
async client
Browse files Browse the repository at this point in the history
  • Loading branch information
ctlove0523 committed Apr 20, 2021
1 parent e48f2f9 commit 56aa419
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 65 deletions.
32 changes: 16 additions & 16 deletions async_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (device *asyncDevice) SendMessage(message Message) AsyncResult {
glog.Info("begin async send message")

messageData := Interface2JsonString(message)
topic := FormatTopic(MessageUpTopic, device.base.Id)
topic := formatTopic(MessageUpTopic, device.base.Id)
glog.Infof("async send message topic is %s", topic)
token := device.base.Client.Publish(topic, device.base.qos, false, messageData)
if token.Wait() && token.Error() != nil {
Expand All @@ -139,7 +139,7 @@ func (device *asyncDevice) ReportProperties(properties DeviceProperties) AsyncRe
go func() {
glog.Info("begin to report properties")
propertiesData := Interface2JsonString(properties)
if token := device.base.Client.Publish(FormatTopic(PropertiesUpTopic, device.base.Id), device.base.qos, false, propertiesData);
if token := device.base.Client.Publish(formatTopic(PropertiesUpTopic, device.base.Id), device.base.qos, false, propertiesData);
token.Wait() && token.Error() != nil {
glog.Warningf("device %s async report properties failed", device.base.Id)
asyncResult.completeError(token.Error())
Expand Down Expand Up @@ -176,7 +176,7 @@ func (device *asyncDevice) BatchReportSubDevicesProperties(service DevicesServic
Devices: service.Devices[begin:end],
}

if token := device.base.Client.Publish(FormatTopic(GatewayBatchReportSubDeviceTopic, device.base.Id), device.base.qos, false, Interface2JsonString(sds));
if token := device.base.Client.Publish(formatTopic(GatewayBatchReportSubDeviceTopic, device.base.Id), device.base.qos, false, Interface2JsonString(sds));
token.Wait() && token.Error() != nil {
glog.Warningf("device %s batch report sub device properties failed", device.base.Id)
loopResult = false
Expand All @@ -199,7 +199,7 @@ func (device *asyncDevice) QueryDeviceShadow(query DevicePropertyQueryRequest, h

go func() {
requestId := uuid.NewV4()
if token := device.base.Client.Publish(FormatTopic(DeviceShadowQueryRequestTopic, device.base.Id)+requestId.String(), device.base.qos, false, Interface2JsonString(query));
if token := device.base.Client.Publish(formatTopic(DeviceShadowQueryRequestTopic, device.base.Id)+requestId.String(), device.base.qos, false, Interface2JsonString(query));
token.Wait() && token.Error() != nil {
glog.Warningf("device %s query device shadow data failed,request id = %s", device.base.Id, requestId)
asyncResult.completeError(token.Error())
Expand Down Expand Up @@ -232,7 +232,7 @@ func (device *asyncDevice) UploadFile(filename string) AsyncResult {
Services: services,
}

if token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
if token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
token.Wait() && token.Error() != nil {
glog.Warningf("publish file upload request url failed")
asyncResult.completeError(&DeviceError{
Expand Down Expand Up @@ -265,7 +265,7 @@ func (device *asyncDevice) UploadFile(filename string) AsyncResult {
}
glog.Infof("file upload url is %s", device.base.fileUrls[filename+FileActionUpload])

//filename = SmartFileName(filename)
//filename = smartFileName(filename)
uploadFlag := CreateHttpClient().UploadFile(filename, device.base.fileUrls[filename+FileActionUpload])
if !uploadFlag {
glog.Errorf("upload file failed")
Expand All @@ -277,7 +277,7 @@ func (device *asyncDevice) UploadFile(filename string) AsyncResult {

response := CreateFileUploadDownLoadResultResponse(filename, FileActionUpload, uploadFlag)

token := device.base.Client.Publish(FormatTopic(PlatformEventToDeviceTopic, device.base.Id), device.base.qos, false, Interface2JsonString(response))
token := device.base.Client.Publish(formatTopic(PlatformEventToDeviceTopic, device.base.Id), device.base.qos, false, Interface2JsonString(response))
if token.Wait() && token.Error() != nil {
glog.Error("report file upload file result failed")
asyncResult.completeError(token.Error())
Expand Down Expand Up @@ -310,7 +310,7 @@ func (device *asyncDevice) DownloadFile(filename string) AsyncResult {
Services: services,
}

if token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
if token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
token.Wait() && token.Error() != nil {
glog.Warningf("publish file download request url failed")
asyncResult.completeError(&DeviceError{
Expand Down Expand Up @@ -352,7 +352,7 @@ func (device *asyncDevice) DownloadFile(filename string) AsyncResult {

response := CreateFileUploadDownLoadResultResponse(filename, FileActionDownload, downloadFlag)

token := device.base.Client.Publish(FormatTopic(PlatformEventToDeviceTopic, device.base.Id), device.base.qos, false, Interface2JsonString(response))
token := device.base.Client.Publish(formatTopic(PlatformEventToDeviceTopic, device.base.Id), device.base.qos, false, Interface2JsonString(response))
if token.Wait() && token.Error() != nil {
glog.Error("report file upload file result failed")
asyncResult.completeError(token.Error())
Expand Down Expand Up @@ -386,7 +386,7 @@ func (device *asyncDevice) ReportDeviceInfo(swVersion, fwVersion string) AsyncRe
Services: []ReportDeviceInfoServiceEvent{event},
}

token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request))
token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request))
if token.Wait() && token.Error() != nil {
asyncResult.completeError(token.Error())
} else {
Expand Down Expand Up @@ -423,7 +423,7 @@ func (device *asyncDevice) ReportLogs(logs []DeviceLogEntry) AsyncResult {

fmt.Println(Interface2JsonString(request))

topic := FormatTopic(DeviceToPlatformTopic, device.base.Id)
topic := formatTopic(DeviceToPlatformTopic, device.base.Id)

token := device.base.Client.Publish(topic, 1, false, Interface2JsonString(request))

Expand Down Expand Up @@ -484,7 +484,7 @@ func (device *asyncDevice) UpdateSubDeviceState(subDevicesStatus SubDevicesStatu
Services: []DataEntry{requestEventService},
}

if token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
if token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
token.Wait() && token.Error() != nil {
glog.Warningf("gateway %s update sub devices status failed", device.base.Id)
asyncResult.completeError(token.Error())
Expand Down Expand Up @@ -522,7 +522,7 @@ func (device *asyncDevice) DeleteSubDevices(deviceIds []string) AsyncResult {
Services: []DataEntry{requestEventService},
}

if token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
if token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
token.Wait() && token.Error() != nil {
glog.Warningf("gateway %s delete sub devices request send failed", device.base.Id)
asyncResult.completeError(token.Error())
Expand Down Expand Up @@ -558,7 +558,7 @@ func (device *asyncDevice) AddSubDevices(deviceInfos []DeviceInfo) AsyncResult {
Services: []DataEntry{requestEventService},
}

if token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
if token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(request));
token.Wait() && token.Error() != nil {
glog.Warningf("gateway %s add sub devices request send failed", device.base.Id)
asyncResult.completeError(token.Error())
Expand Down Expand Up @@ -591,7 +591,7 @@ func (device *asyncDevice) SyncAllVersionSubDevices() AsyncResult {
Services: dataEntries,
}

if token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(data));
if token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(data));
token.Wait() && token.Error() != nil {
asyncResult.completeError(token.Error())
} else {
Expand Down Expand Up @@ -626,7 +626,7 @@ func (device *asyncDevice) SyncSubDevices(version int) AsyncResult {
Services: dataEntries,
}

if token := device.base.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(data));
if token := device.base.Client.Publish(formatTopic(DeviceToPlatformTopic, device.base.Id), device.base.qos, false, Interface2JsonString(data));
token.Wait() && token.Error() != nil {
glog.Errorf("send sync sub device request failed")
asyncResult.completeError(token.Error())
Expand Down
28 changes: 14 additions & 14 deletions base_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (device *baseIotDevice) Init() bool {
options.AddBroker(device.Servers)
options.SetClientID(assembleClientId(device))
options.SetUsername(device.Id)
options.SetPassword(HmacSha256(device.Password, TimeStamp()))
options.SetPassword(hmacSha256(device.Password, timeStamp()))
options.SetKeepAlive(250 * time.Second)
options.SetAutoReconnect(true)
options.SetConnectRetry(true)
Expand Down Expand Up @@ -252,7 +252,7 @@ func assembleClientId(device *baseIotDevice) string {
segments[0] = device.Id
segments[1] = "0"
segments[2] = "0"
segments[3] = TimeStamp()
segments[3] = timeStamp()

return strings.Join(segments, "_")
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func (device *baseIotDevice) createCommandMqttHandler() func(client mqtt.Client,
ResultCode: 1,
})
}
if token := device.Client.Publish(FormatTopic(CommandResponseTopic, device.Id)+GetTopicRequestId(message.Topic()), 1, false, res);
if token := device.Client.Publish(formatTopic(CommandResponseTopic, device.Id)+getTopicRequestId(message.Topic()), 1, false, res);
token.Wait() && token.Error() != nil {
glog.Infof("device %s send command response failed", device.Id)
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func (device *baseIotDevice) createPropertiesSetMqttHandler() func(client mqtt.C
response.ResultDesc = "Set properties failed."
res = Interface2JsonString(response)
}
if token := device.Client.Publish(FormatTopic(PropertiesSetResponseTopic, device.Id)+GetTopicRequestId(message.Topic()), device.qos, false, res);
if token := device.Client.Publish(formatTopic(PropertiesSetResponseTopic, device.Id)+getTopicRequestId(message.Topic()), device.qos, false, res);
token.Wait() && token.Error() != nil {
glog.Warningf("unmarshal platform properties set request failed,device id = %s,message = %s", device.Id, message)
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func (device *baseIotDevice) createPropertiesQueryMqttHandler() func(client mqtt

queryResult := device.propertyQueryHandler(*propertiesQueryRequest)
responseToPlatform := Interface2JsonString(queryResult)
if token := device.Client.Publish(FormatTopic(PropertiesQueryResponseTopic, device.Id)+GetTopicRequestId(message.Topic()), device.qos, false, responseToPlatform);
if token := device.Client.Publish(formatTopic(PropertiesQueryResponseTopic, device.Id)+getTopicRequestId(message.Topic()), device.qos, false, responseToPlatform);
token.Wait() && token.Error() != nil {
glog.Warningf("device %s send properties query response failed.", device.Id)
}
Expand All @@ -390,47 +390,47 @@ func (device *baseIotDevice) createPropertiesQueryResponseMqttHandler() func(cli

func (device *baseIotDevice) subscribeDefaultTopics() {
// 订阅平台命令下发topic
topic := FormatTopic(CommandDownTopic, device.Id)
topic := formatTopic(CommandDownTopic, device.Id)
if token := device.Client.Subscribe(topic, device.qos, device.createCommandMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscribe platform send command topic %s failed", device.Id, topic)
panic(0)
}

// 订阅平台消息下发的topic
topic = FormatTopic(MessageDownTopic, device.Id)
topic = formatTopic(MessageDownTopic, device.Id)
if token := device.Client.Subscribe(topic, device.qos, device.createMessageMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device % subscribe platform send message topic %s failed.", device.Id, topic)
panic(0)
}

// 订阅平台设置设备属性的topic
topic = FormatTopic(PropertiesSetRequestTopic, device.Id)
topic = formatTopic(PropertiesSetRequestTopic, device.Id)
if token := device.Client.Subscribe(topic, device.qos, device.createPropertiesSetMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscribe platform set properties topic %s failed", device.Id, topic)
panic(0)
}

// 订阅平台查询设备属性的topic
topic = FormatTopic(PropertiesQueryRequestTopic, device.Id)
topic = formatTopic(PropertiesQueryRequestTopic, device.Id)
if token := device.Client.Subscribe(topic, device.qos, device.createPropertiesQueryMqttHandler())
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscriber platform query device properties topic failed %s", device.Id, topic)
panic(0)
}

// 订阅查询设备影子响应的topic
topic = FormatTopic(DeviceShadowQueryResponseTopic, device.Id)
topic = formatTopic(DeviceShadowQueryResponseTopic, device.Id)
if token := device.Client.Subscribe(topic, device.qos, device.createPropertiesQueryResponseMqttHandler());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscribe query device shadow topic %s failed", device.Id, topic)
panic(0)
}

// 订阅平台下发到设备的事件
topic = FormatTopic(PlatformEventToDeviceTopic, device.Id)
topic = formatTopic(PlatformEventToDeviceTopic, device.Id)
if token := device.Client.Subscribe(topic, device.qos, device.handlePlatformToDeviceData());
token.Wait() && token.Error() != nil {
glog.Warningf("device %s subscribe query device shadow topic %s failed", device.Id, topic)
Expand Down Expand Up @@ -598,7 +598,7 @@ func (device *baseIotDevice) reportLogs(logs []DeviceLogEntry) {
}

reportedLog := Interface2JsonString(data)
device.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.Id), 0, false, reportedLog)
device.Client.Publish(formatTopic(DeviceToPlatformTopic, device.Id), 0, false, reportedLog)
}

func (device *baseIotDevice) reportVersion() {
Expand All @@ -620,7 +620,7 @@ func (device *baseIotDevice) reportVersion() {
Services: []DataEntry{dataEntry},
}

device.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.Id), device.qos, false, Interface2JsonString(data))
device.Client.Publish(formatTopic(DeviceToPlatformTopic, device.Id), device.qos, false, Interface2JsonString(data))
}

func (device *baseIotDevice) upgradeDevice(upgradeType byte, upgradeInfo *UpgradeInfo) {
Expand All @@ -636,7 +636,7 @@ func (device *baseIotDevice) upgradeDevice(upgradeType byte, upgradeInfo *Upgrad
Services: []DataEntry{dataEntry},
}

if token := device.Client.Publish(FormatTopic(DeviceToPlatformTopic, device.Id), device.qos, false, Interface2JsonString(data));
if token := device.Client.Publish(formatTopic(DeviceToPlatformTopic, device.Id), device.qos, false, Interface2JsonString(data));
token.Wait() && token.Error() != nil {
glog.Errorf("device %s upgrade failed,type %d", device.Id, upgradeType)
}
Expand Down
Loading

0 comments on commit 56aa419

Please sign in to comment.