Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-28 - enabled messages paho persistence #27

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
- added clean session to env variables
- automatically put clean session to false if file message persistence enabled
- updated doc

Signed-off-by: PricelessRabbit <PricelessRabbit@gmail.com>
  • Loading branch information
pricelessrabbit committed Nov 25, 2020
commit b9975c65bae3f1f042f7a942806e092dbcb044cb
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The service is configured using the environment variables presented in the follo
| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key |
| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 |
| MF_EXPORT_MQTT_RETAIN | MQTT retain | false |
| MF_EXPORT_MQTT_CLEAN_SESSION | MQTT clean session | false |
| MF_EXPORT_MQTT_PERSIST | persist MQTT QOS 2 pending messages in filesystem, to avoid data loss | false |
| MF_EXPORT_MQTT_PERSIST_DIR | directory in which pending messages will be saved if persist is enabled | false |
| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml |
Expand Down
83 changes: 45 additions & 38 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@ import (
)

const (
svcName = "export"
defNatsURL = nats.DefaultURL
defLogLevel = "debug"
defPort = "8170"
defMqttHost = "tcp://localhost:1883"
defMqttUsername = ""
defMqttPassword = ""
defMqttChannel = ""
defMqttSkipTLSVer = "true"
defMqttMTLS = "false"
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = "false"
defMqttPersist = "false"
defMqttPersistDir = "../mqtt_persist"
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "../configs/config.toml"
svcName = "export"
defNatsURL = nats.DefaultURL
defLogLevel = "debug"
defPort = "8170"
defMqttHost = "tcp://localhost:1883"
defMqttUsername = ""
defMqttPassword = ""
defMqttChannel = ""
defMqttSkipTLSVer = "true"
defMqttMTLS = "false"
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = "false"
defMqttCleanSession = "true"
defMqttPersist = "false"
defMqttPersistDir = "../mqtt_persist"
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "../configs/config.toml"

defCacheURL = "localhost:6379"
defCachePass = ""
Expand All @@ -56,20 +57,21 @@ const (
envLogLevel = "MF_EXPORT_LOG_LEVEL"
envPort = "MF_EXPORT_PORT"

envMqttHost = "MF_EXPORT_MQTT_HOST"
envMqttUsername = "MF_EXPORT_MQTT_USERNAME"
envMqttPassword = "MF_EXPORT_MQTT_PASSWORD"
envMqttChannel = "MF_EXPORT_MQTT_CHANNEL"
envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_EXPORT_MQTT_MTLS"
envMqttCA = "MF_EXPORT_MQTT_CA"
envMqttQoS = "MF_EXPORT_MQTT_QOS"
envMqttRetain = "MF_EXPORT_MQTT_RETAIN"
envMqttPersist = "MF_MQTT_PERSIST"
envMqttPersistDir = "MF_MQTT_PERSIST_FILE"
envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK"
envConfigFile = "MF_EXPORT_CONFIG_FILE"
envMqttHost = "MF_EXPORT_MQTT_HOST"
envMqttUsername = "MF_EXPORT_MQTT_USERNAME"
envMqttPassword = "MF_EXPORT_MQTT_PASSWORD"
envMqttChannel = "MF_EXPORT_MQTT_CHANNEL"
envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_EXPORT_MQTT_MTLS"
envMqttCA = "MF_EXPORT_MQTT_CA"
envMqttQoS = "MF_EXPORT_MQTT_QOS"
envMqttRetain = "MF_EXPORT_MQTT_RETAIN"
envMqttCleanSession = "MF_EXPORT_MQTT_CLEAN_SESSION"
envMqttPersist = "MF_MQTT_PERSIST"
envMqttPersistDir = "MF_MQTT_PERSIST_FILE"
envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK"
envConfigFile = "MF_EXPORT_CONFIG_FILE"

envCacheURL = "MF_EXPORT_CACHE_URL"
envCachePass = "MF_EXPORT_CACHE_PASS"
Expand Down Expand Up @@ -149,6 +151,10 @@ func loadConfigs() (exp.Config, error) {
if err != nil {
mqttRetain = false
}
mqttCleanSession, err := strconv.ParseBool(mainflux.Env(envMqttCleanSession, defMqttCleanSession))
if err != nil {
mqttRetain = false
}
mqttPersist, err := strconv.ParseBool(mainflux.Env(envMqttPersist, defMqttPersist))
if err != nil {
mqttPersist = false
Expand All @@ -174,12 +180,13 @@ func loadConfigs() (exp.Config, error) {
Password: mainflux.Env(envMqttPassword, defMqttPassword),
Username: mainflux.Env(envMqttUsername, defMqttUsername),

Retain: mqttRetain,
Persist: mqttPersist,
PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir),
QoS: QoS,
MTLS: mqttMTLS,
SkipTLSVer: mqttSkipTLSVer,
Retain: mqttRetain,
CleanSession: mqttCleanSession,
Persist: mqttPersist,
PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir),
QoS: QoS,
MTLS: mqttMTLS,
SkipTLSVer: mqttSkipTLSVer,

CAPath: mainflux.Env(envMqttCA, defMqttCA),
ClientCertPath: mainflux.Env(envMqttCert, defMqttCert),
Expand Down
1 change: 1 addition & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ File = "/configs/export/config.toml"
password = ""
qos = 0
retain = false
clean_session = true
persist = false
persist_dir = "../mqtt_persist"
skip_tls_ver = true
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MQTT struct {
MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"`
SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"`
Retain bool `json:"retain" toml:"retain" mapstructure:"retain"`
CleanSession bool `json:"clean_session" toml:"clean_session" mapstructure:"clean_session"`
Persist bool `json:"persist" toml:"persist" mapstructure:"persist"`
PersistDir string `json:"persist_dir" toml:"persist_dir" mapstructure:"persist_dir"`
QoS int `json:"qos" toml:"qos" mapstructure:"qos"`
Expand Down
4 changes: 3 additions & 1 deletion pkg/export/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,16 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C
opts := mqtt.NewClientOptions().
AddBroker(conf.MQTT.Host).
SetClientID(e.id).
SetCleanSession(false).
SetCleanSession(conf.MQTT.CleanSession).
SetAutoReconnect(true).
SetOnConnectHandler(e.conn).
SetConnectionLostHandler(e.lost)

if conf.MQTT.Persist {
store := mqtt.NewFileStore(conf.MQTT.PersistDir)
opts.SetStore(store)
//disable clean session because paho deletes stored messages when restarts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments start with the capital letter. Also, add one space after //.

opts.SetCleanSession(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to make this configurable?

Copy link
Contributor Author

@pricelessrabbit pricelessrabbit Nov 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if user set persistent = true but clean session = true, when service restarts messages are lost.
so i make it configurable (clean_session) in the config, but if user sets persist = true, the clean session is set accordingly.

I can change implementation and make the settings completely independent, but in that case user have to manually set clean_session = false when he wants to persist messages in fs

}

if conf.MQTT.Username != "" && conf.MQTT.Password != "" {
Expand Down