-
Notifications
You must be signed in to change notification settings - Fork 7
/
batch_send_message.go
115 lines (96 loc) · 2.79 KB
/
batch_send_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package alimns
import (
"context"
"encoding/xml"
"errors"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
)
// SendMessage 发送消息
type SendMessage struct {
XMLName xml.Name `xml:"Message"`
XMLNs string `xml:"xmlns,attr"`
ErrorCode string `xml:"ErrorCode"`
ErrorMessage string `xml:"ErrorMessage"`
MessageID string `xml:"MessageId"`
MessageBodyMD5 string `xml:"MessageBodyMD5"`
ReceiptHandle string `xml:"ReceiptHandle"` // 发送延时消息才有返回
}
// BatchSendMessageResponse 批量发送消息回复
type BatchSendMessageResponse struct {
XMLName xml.Name `xml:"Messages"`
XMLNs string `xml:"xmlns,attr"`
SendMessages []*SendMessage `xml:"Message"`
}
// BatchSendMessage 批量发送消息
func (c *Client) BatchSendMessage(name string, messageList ...*Message) (*BatchSendMessageResponse, error) {
if len(messageList) > 16 {
return nil, batchSendMessageNumLimitError
}
var (
try = 0
sendedMessages = make([]*SendMessage, len(messageList))
roundIndexList []int
err error
)
originalMessageList := messageList
for idx := range originalMessageList {
roundIndexList = append(roundIndexList, idx)
}
start:
requestLine := fmt.Sprintf(mnsBatchSendMessage, name)
req := c.ca.NewRequest().Post().WithPath(requestLine).WithXMLBody(&messageList).WithTimeout(apiTimeout)
body, err := req.ReqBody()
if err != nil {
return nil, err
}
resp, err := c.ca.Do(context.TODO(), req)
if err != nil {
return nil, err
}
try++
switch resp.StatusCode() {
case http.StatusCreated,
http.StatusInternalServerError:
var batchSendMessageResponse BatchSendMessageResponse
err = resp.DecodeFromXML(&batchSendMessageResponse)
if err != nil {
return nil, err
}
var retryIdx []int
for seq, sendMessage := range batchSendMessageResponse.SendMessages {
idx := roundIndexList[seq]
switch sendMessage.ErrorCode {
case "":
sendedMessages[idx] = sendMessage
case internalError.Error():
retryIdx = append(retryIdx, idx)
default:
c.logger.Warn(fmt.Sprintf("fail to call BatchSendMessage for %s", name), zap.Error(err), zap.String("body", string(body)))
}
}
if len(retryIdx) == 0 {
batchSendMessageResponse.SendMessages = sendedMessages
return &batchSendMessageResponse, nil
}
roundIndexList = retryIdx
var retryMessageList []*Message
for _, idx := range retryIdx {
retryMessageList = append(retryMessageList, originalMessageList[idx])
}
messageList = retryMessageList
if try > 4 {
return nil, batchSendMessageTryLimitError
}
time.Sleep(100 * time.Millisecond * time.Duration(try))
goto start
default:
var respErr RespErr
if err := resp.DecodeFromXML(&respErr); err != nil {
return nil, err
}
return nil, errors.New(respErr.Message)
}
}