-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoptions.go
261 lines (236 loc) · 8.75 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/*
* Copyright (c) 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
*/
package mqtt
import (
"crypto/tls"
"net/url"
"os"
"time"
)
// MessageHandler is a callback type which can be set to be
// executed upon the arrival of messages published to topics
// to which the client is subscribed.
type MessageHandler func(client *MqttClient, message Message)
// OnConnectionLost is a callback type which can be set to be
// executed upon an unintended disconnection from the MQTT broker.
// Disconnects caused by calling Disconnect or ForceDisconnect will
// not cause an OnConnectionLost callback to execute.
type OnConnectionLost func(client *MqttClient, reason error)
// ClientOptions contains configurable options for an MqttClient.
type ClientOptions struct {
server *url.URL
server2 *url.URL
clientId string
username string
password string
cleanses bool
order bool
will_enabled bool
will_topic string
will_payload []byte
will_qos QoS
will_retained bool
maxinflight uint
tlsconfig *tls.Config
timeout uint
store Store
tracefile *os.File
tracelevel tracelevel
msgRouter *router
stopRouter chan bool
pubChanZero chan *Message
pubChanOne chan *Message
pubChanTwo chan *Message
onconnlost OnConnectionLost
mids messageIds
writeTimeout time.Duration
}
// NewClientClientOptions will create a new ClientClientOptions type with some
// default values.
// Port: 1883
// CleanSession: True
// Timeout: 30 (seconds)
// Tracefile: os.Stdout
func NewClientOptions() *ClientOptions {
o := &ClientOptions{
server: nil,
server2: nil,
clientId: "",
username: "",
password: "",
cleanses: true,
order: true,
will_enabled: false,
will_topic: "",
will_payload: nil,
will_qos: QOS_ZERO,
will_retained: false,
maxinflight: 10,
tlsconfig: nil,
store: nil,
timeout: 30,
tracefile: os.Stdout,
tracelevel: Verbose,
pubChanZero: nil,
pubChanOne: nil,
pubChanTwo: nil,
onconnlost: DefaultErrorHandler,
mids: messageIds{index: make(map[MId]bool)},
writeTimeout: 0, // 0 represents timeout disabled
}
o.msgRouter, o.stopRouter = newRouter()
return o
}
// SetBroker will allow you to set the URI for your broker. The format should be
// scheme://host:port
// Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
// and "port" is the port on which the broker is accepting connections.
// For example, one could connect to tcp://test.mosquitto.org:1883
func (opts *ClientOptions) SetBroker(server string) *ClientOptions {
opts.server, _ = url.Parse(server)
return opts
}
// SetStandbyBroker will allow you to set a second URI to which the client will attempt
// to connect in the event of a connection failure. This is for use only in cases where
// two brokers are configured as a highly available pair. (For example, two IBM MessageSight
// appliances configured in High Availability mode).
func (opts *ClientOptions) SetStandbyBroker(server string) *ClientOptions {
opts.server2, _ = url.Parse(server)
return opts
}
// SetClientId will set the client id to be used by this client when
// connecting to the MQTT broker. According to the MQTT v3.1 specification,
// a client id mus be no longer than 23 characters.
func (opts *ClientOptions) SetClientId(clientid string) *ClientOptions {
opts.clientId = clientid
return opts
}
// SetUsername will set the username to be used by this client when connecting
// to the MQTT broker. Note: without the use of SSL/TLS, this information will
// be sent in plaintext accross the wire.
func (opts *ClientOptions) SetUsername(username string) *ClientOptions {
opts.username = username
return opts
}
// SetPassword will set the password to be used by this client when connecting
// to the MQTT broker. Note: without the use of SSL/TLS, this information will
// be sent in plaintext accross the wire.
func (opts *ClientOptions) SetPassword(password string) *ClientOptions {
opts.password = password
return opts
}
// SetCleanSession will set the "clean session" flag in the connect message
// when this client connects to an MQTT broker. By setting this flag, you are
// indicating that no messages saved by the broker for this client should be
// delivered. Any messages that were going to be sent by this client before
// diconnecting previously but didn't will not be sent upon connecting to the
// broker.
func (opts *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
opts.cleanses = clean
return opts
}
// SetOrderMatters will set the message routing to guarantee order within
// each QoS level. By default, this value is true. If set to false,
// this flag indicates that messages can be delivered asynchronously
// from the client to the application and possibly arrive out of order.
func (opts *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
opts.order = order
return opts
}
// SetMaxInFlight will set a limit on the maximum number of "in-flight" messages
// going from the client to the server. This setting is currently ignored.
// func (opts *ClientOptions) SetMaxInFlight(max uint) *ClientOptions {
// opts.maxinflight = max
// return opts
// }
// SetTlsConfig will set an SSL/TLS configuration to be used when connecting
// to an MQTT broker. Please read the official Go documentation for more
// information.
func (opts *ClientOptions) SetTlsConfig(tlsconfig *tls.Config) *ClientOptions {
opts.tlsconfig = tlsconfig
return opts
}
// SetStore will set the implementation of the Store interface
// used to provide message persistence in cases where QoS levels
// QoS_ONE or QoS_TWO are used. If no store is provided, then the
// client will use MemoryStore by default.
func (opts *ClientOptions) SetStore(store Store) *ClientOptions {
opts.store = store
return opts
}
// SetTimeout will set the amount of time (in seconds) that the client
// should wait before sending a PING request to the broker. This will
// allow the client to know that a connection has not been lost with the
// server.
func (opts *ClientOptions) SetTimeout(timeout uint) *ClientOptions {
opts.timeout = timeout
return opts
}
// UnsetWill will cause any set will message to be disregarded.
func (opts *ClientOptions) UnsetWill() *ClientOptions {
opts.will_enabled = false
return opts
}
// SetWill accepts a string will message to be set. When the client connects,
// it will give this will message to the broker, which will then publish the
// provided payload (the will) to any clients that are subscribed to the provided
// topic.
func (opts *ClientOptions) SetWill(topic string, payload string, qos QoS, retained bool) *ClientOptions {
opts.SetBinaryWill(topic, []byte(payload), qos, retained)
return opts
}
// SetBinaryWill accepts a []byte will message to be set. When the client connects,
// it will give this will message to the broker, which will then publish the
// provided payload (the will) to any clients that are subscribed to the provided
// topic.
func (opts *ClientOptions) SetBinaryWill(topic string, payload []byte, qos QoS, retained bool) *ClientOptions {
opts.will_enabled = true
opts.will_topic = topic
opts.will_payload = payload
opts.will_qos = qos
opts.will_retained = retained
return opts
}
// SetTracefile will set the output for any trace statements that are generated
// by the client. By default, trace statements will be directed to os.Stdout.
func (opts *ClientOptions) SetTracefile(tracefile *os.File) *ClientOptions {
opts.tracefile = tracefile
return opts
}
// SetTraceLevel will set the trace level (verbosity) of the client.
// Options are:
// Off
// Critical
// Warn
// Verbose
func (opts *ClientOptions) SetTraceLevel(level tracelevel) *ClientOptions {
opts.tracelevel = level
return opts
}
// SetDefaultPublishHandler
func (opts *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
opts.msgRouter.setDefaultHandler(defaultHandler)
return opts
}
// SetOnConnectionLost will set the OnConnectionLost callback to be executed
// in the case where the client unexpectedly loses connection with the MQTT broker.
func (opts *ClientOptions) SetOnConnectionLost(onLost OnConnectionLost) *ClientOptions {
opts.onconnlost = onLost
return opts
}
// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
// timeout error. A duration of 0 never times out.
func (opts *ClientOptions) SetWriteTimeout(t time.Duration) {
opts.writeTimeout = t
}