From c8bcea34c4ede0f2bc0c3fc293545ded811b9f57 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 7 Feb 2020 14:15:37 +0100 Subject: [PATCH] Enable mtls communication with mqtt broker (#15) * enable mtls Signed-off-by: Mirko Teodorovic * enable mtls Signed-off-by: Mirko Teodorovic * fix config endpoint Signed-off-by: Mirko Teodorovic * fix double broker connection Signed-off-by: Mirko Teodorovic * fix disconnect problem Signed-off-by: Mirko Teodorovic * killing the white lines Signed-off-by: Mirko Teodorovic * small corrections Signed-off-by: Mirko Teodorovic * remove some types and typos Signed-off-by: Mirko Teodorovic * fix logger usage Signed-off-by: Mirko Teodorovic * mqtt is moved to conn Signed-off-by: Mirko Teodorovic * resolve comments Signed-off-by: Mirko Teodorovic * resolve comments Signed-off-by: Mirko Teodorovic * kill whit line Signed-off-by: Mirko Teodorovic --- cmd/main.go | 170 ++++++++++++++++----- go.sum | 1 + internal/app/agent/api/common.go | 17 ++- internal/app/agent/api/endpoints.go | 36 +---- internal/app/agent/api/requests.go | 4 +- internal/app/agent/api/responses.go | 5 - internal/pkg/bootstrap/bootstrap.go | 12 +- internal/pkg/config/config.go | 23 +-- internal/pkg/{mqtt/sub.go => conn/conn.go} | 20 +-- 9 files changed, 181 insertions(+), 107 deletions(-) rename internal/pkg/{mqtt/sub.go => conn/conn.go} (83%) diff --git a/cmd/main.go b/cmd/main.go index 2864c472..e035f476 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,20 +1,23 @@ package main import ( + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "log" "net/http" "os" "os/signal" "syscall" - paho "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/agent/internal/app/agent" "github.com/mainflux/agent/internal/app/agent/api" "github.com/mainflux/agent/internal/pkg/bootstrap" "github.com/mainflux/agent/internal/pkg/config" - "github.com/mainflux/agent/internal/pkg/mqtt" + "github.com/mainflux/agent/internal/pkg/conn" "github.com/mainflux/agent/pkg/edgex" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" @@ -32,11 +35,19 @@ const ( defLogLevel = "info" defEdgexURL = "http://localhost:48090/api/v1/" defMqttURL = "localhost:1883" - defThingID = "2dce1d65-73b4-4020-bfe3-403d851386e7" - defThingKey = "1ff0d0f0-ea04-4fbb-83c4-c10b110bf566" defCtrlChan = "f36c3733-95a3-481c-a314-4125e03d8993" defDataChan = "ea353dac-0298-4fbb-9e5d-501e3699949c" defEncryption = "false" + defMqttUsername = "" + defMqttPassword = "" + defMqttChannel = "" + defMqttSkipTLSVer = "true" + defMqttMTLS = "false" + defMqttCA = "ca.crt" + defMqttQoS = "0" + defMqttRetain = false + defMqttCert = "thing.cert" + defMqttPrivKey = "thing.key" defConfigFile = "config.toml" defNatsURL = nats.DefaultURL @@ -56,6 +67,16 @@ const ( envDataChan = "MF_AGENT_DATA_CHANNEL" envEncryption = "MF_AGENT_ENCRYPTION" envNatsURL = "MF_AGENT_NATS_URL" + + envMqttUsername = "MF_AGENT_MQTT_USERNAME" + envMqttPassword = "MF_AGENT_MQTT_PASSWORD" + envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS" + envMqttMTLS = "MF_AGENT_MQTT_MTLS" + envMqttCA = "MF_AGENT_MQTT_CA" + envMqttQoS = "MF_AGENT_MQTT_QOS" + envMqttRetain = "MF_AGENT_MQTT_RETAIN" + envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT" + envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK" ) func main() { @@ -66,21 +87,28 @@ func main() { cfg, err := loadConfig(logger) if err != nil { - log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error())) + logger.Error(fmt.Sprintf("Failed to load config: %s", err.Error())) + os.Exit(1) } nc, err := nats.Connect(cfg.Agent.Server.NatsURL) if err != nil { - log.Fatalf(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL)) + logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL)) + os.Exit(1) } defer nc.Close() - mqttClient := connectToMQTTBroker(cfg.Agent.MQTT.URL, cfg.Agent.Thing.ID, cfg.Agent.Thing.Key, logger) + mqttClient, err := connectToMQTTBroker(cfg.Agent.MQTT, logger) + if err != nil { + logger.Error(err.Error()) + os.Exit(1) + } edgexClient := edgex.NewClient(cfg.Agent.Edgex.URL, logger) svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger) if err != nil { - log.Fatalf(fmt.Sprintf("Error in agent service: %s", err.Error())) + logger.Error(fmt.Sprintf("Error in agent service: %s", err.Error())) + os.Exit(1) } svc = api.LoggingMiddleware(svc, logger) @@ -138,56 +166,85 @@ func loadConfig(logger logger.Logger) (config.Config, error) { NatsURL: mainflux.Env(envNatsURL, defNatsURL), Port: mainflux.Env(envLogLevel, defLogLevel), } - tc := config.ThingConf{ - ID: mainflux.Env(envThingID, defThingID), - Key: mainflux.Env(envThingKey, defThingKey), - } cc := config.ChanConf{ Control: mainflux.Env(envCtrlChan, defCtrlChan), Data: mainflux.Env(envDataChan, defDataChan), } ec := config.EdgexConf{URL: mainflux.Env(envEdgexURL, defEdgexURL)} lc := config.LogConf{Level: mainflux.Env(envLogLevel, defLogLevel)} - mc := config.MQTTConf{URL: mainflux.Env(envMqttURL, defMqttURL)} - - c := config.New(sc, tc, cc, ec, lc, mc, file) + mc := config.MQTTConf{ + URL: mainflux.Env(envMqttURL, defMqttURL), + Username: mainflux.Env(envMqttUsername, defMqttUsername), + Password: mainflux.Env(envMqttPassword, defMqttPassword), + } + c := config.New(sc, cc, ec, lc, mc, file) if err := c.Read(); err != nil { logger.Error(fmt.Sprintf("Failed to read config: %s", err)) return config.Config{}, err } + mc, err := loadCertificate(c.Agent.MQTT) + if err != nil { + logger.Error(fmt.Sprintf("Failed to set up mtls certs %s", err)) + } + c.Agent.MQTT = mc + return *c, nil } -func connectToMQTTBroker(mqttURL, thingID, thingKey string, logger logger.Logger) paho.Client { - clientID := fmt.Sprintf("agent-%s", thingID) - opts := paho.NewClientOptions() - opts.AddBroker(mqttURL) - opts.SetClientID(clientID) - opts.SetUsername(thingID) - opts.SetPassword(thingKey) - opts.SetCleanSession(true) - opts.SetAutoReconnect(true) - opts.SetOnConnectHandler(func(c paho.Client) { - logger.Info("Connected to MQTT broker") - }) - opts.SetConnectionLostHandler(func(c paho.Client, err error) { - logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error())) - os.Exit(1) - }) +func connectToMQTTBroker(conf config.MQTTConf, logger logger.Logger) (mqtt.Client, error) { + name := fmt.Sprintf("agent-%s", conf.Username) + conn := func(client mqtt.Client) { + logger.Info(fmt.Sprintf("Client %s connected", name)) + } - client := paho.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - logger.Error(fmt.Sprintf("Failed to connect to MQTT broker: %s", token.Error())) - os.Exit(1) + lost := func(client mqtt.Client, err error) { + logger.Info(fmt.Sprintf("Client %s disconnected", name)) + } + + opts := mqtt.NewClientOptions(). + AddBroker(conf.URL). + SetClientID(name). + SetCleanSession(true). + SetAutoReconnect(true). + SetOnConnectHandler(conn). + SetConnectionLostHandler(lost) + + if conf.Username != "" && conf.Password != "" { + opts.SetUsername(conf.Username) + opts.SetPassword(conf.Password) } - return client + if conf.MTLS { + cfg := &tls.Config{ + InsecureSkipVerify: conf.SkipTLSVer, + } + + if conf.CA != nil { + cfg.RootCAs = x509.NewCertPool() + cfg.RootCAs.AppendCertsFromPEM(conf.CA) + } + if conf.Cert.Certificate != nil { + cfg.Certificates = []tls.Certificate{conf.Cert} + } + + cfg.BuildNameToCertificate() + opts.SetTLSConfig(cfg) + opts.SetProtocolVersion(4) + } + client := mqtt.NewClient(opts) + token := client.Connect() + token.Wait() + + if token.Error() != nil { + return nil, token.Error() + } + return client, nil } -func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) { - broker := mqtt.NewBroker(svc, mc, nc, logger) +func subscribeToMQTTBroker(svc agent.Service, mc mqtt.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) { + broker := conn.NewBroker(svc, mc, nc, logger) topic := fmt.Sprintf("channels/%s/messages", ctrlChan) if err := broker.Subscribe(topic); err != nil { logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error())) @@ -195,3 +252,40 @@ func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, n } logger.Info("Subscribed to MQTT broker") } + +func loadCertificate(cfg config.MQTTConf) (config.MQTTConf, error) { + c := cfg + caByte := []byte{} + cert := tls.Certificate{} + if !cfg.MTLS { + return c, nil + } + caFile, err := os.Open(cfg.CAPath) + defer caFile.Close() + if err != nil { + return c, err + } + caByte, err = ioutil.ReadAll(caFile) + if err != nil { + return c, err + } + clientCert, err := os.Open(cfg.CertPath) + defer clientCert.Close() + if err != nil { + return c, err + } + cc, _ := ioutil.ReadAll(clientCert) + privKey, err := os.Open(cfg.PrivKeyPath) + defer clientCert.Close() + if err != nil { + return c, err + } + pk, _ := ioutil.ReadAll((privKey)) + cert, err = tls.X509KeyPair([]byte(cc), []byte(pk)) + if err != nil { + return c, err + } + cfg.Cert = cert + cfg.CA = caByte + return c, nil +} diff --git a/go.sum b/go.sum index b35f7204..1ad12d15 100644 --- a/go.sum +++ b/go.sum @@ -250,6 +250,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/internal/app/agent/api/common.go b/internal/app/agent/api/common.go index 914deebf..c56c1720 100644 --- a/internal/app/agent/api/common.go +++ b/internal/app/agent/api/common.go @@ -7,11 +7,6 @@ type serverConf struct { port string `json:"port"` } -type thingConf struct { - id string `json:"id"` - key string `json:"key"` -} - type chanConf struct { control string `json:"control"` data string `json:"data"` @@ -26,13 +21,21 @@ type logConf struct { } type mqttConf struct { - url string `json:"url"` + url string `json:"url"` + username string `json:"username"` + password string `json:"json"` + mtls bool `json:"mtls"` + skipTLSVer bool `json:"skip_tls_ver"` + retain bool `json:"retain"` + QoS int `json:"qos"` + caPath string `json:"ca_path"` + certPath string `json:"cert_path"` + privKeyPath string `json:"priv_key_path"` } // Config struct of Mainflux Agent type agentConf struct { server serverConf `json:"server"` - thing thingConf `json:"thing"` channels chanConf `json:"channels"` edgex edgexConf `json:"edgex"` log logConf `json:"log"` diff --git a/internal/app/agent/api/endpoints.go b/internal/app/agent/api/endpoints.go index 95bdc936..968aed17 100644 --- a/internal/app/agent/api/endpoints.go +++ b/internal/app/agent/api/endpoints.go @@ -70,16 +70,15 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint { Control: req.agent.channels.control, Data: req.agent.channels.data, } - tc := config.ThingConf{ - ID: req.agent.thing.id, - Key: req.agent.thing.key, - } ec := config.EdgexConf{URL: req.agent.edgex.url} lc := config.LogConf{Level: req.agent.log.level} - mc := config.MQTTConf{URL: req.agent.mqtt.url} + mc := config.MQTTConf{ + URL: req.agent.mqtt.url, + Username: req.agent.mqtt.username, + Password: req.agent.mqtt.password, + } a := config.AgentConf{ Server: sc, - Thing: tc, Channels: cc, Edgex: ec, Log: lc, @@ -100,30 +99,7 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint { func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint { return func(_ context.Context, request interface{}) (interface{}, error) { c := svc.Config() - - sc := serverConf{port: c.Agent.Server.Port} - cc := chanConf{ - control: c.Agent.Channels.Control, - data: c.Agent.Channels.Data, - } - tc := thingConf{ - id: c.Agent.Thing.ID, - key: c.Agent.Thing.Key, - } - ec := edgexConf{url: c.Agent.Edgex.URL} - lc := logConf{level: c.Agent.Log.Level} - mc := mqttConf{url: c.Agent.MQTT.URL} - a := agentConf{ - server: sc, - thing: tc, - channels: cc, - edgex: ec, - log: lc, - mqtt: mc, - } - res := configRes{agent: a} - - return res, nil + return c, nil } } diff --git a/internal/app/agent/api/requests.go b/internal/app/agent/api/requests.go index fa2c4daa..81e5f2d6 100644 --- a/internal/app/agent/api/requests.go +++ b/internal/app/agent/api/requests.go @@ -41,8 +41,8 @@ type addConfigReq struct { func (req addConfigReq) validate() error { if req.agent.server.port == "" || - req.agent.thing.id == "" || - req.agent.thing.key == "" || + req.agent.mqtt.username == "" || + req.agent.mqtt.password == "" || req.agent.channels.control == "" || req.agent.channels.data == "" || req.agent.log.level == "" || diff --git a/internal/app/agent/api/responses.go b/internal/app/agent/api/responses.go index d6146907..6bf4ae41 100644 --- a/internal/app/agent/api/responses.go +++ b/internal/app/agent/api/responses.go @@ -13,8 +13,3 @@ type execRes struct { Name string `json:"n"` Value string `json:"vs"` } - -type configRes struct { - agent agentConf - file string -} diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index 4841bb71..cef64940 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -97,19 +97,19 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { NatsURL: ic.NatsURL, } - tc := config.ThingConf{ - ID: dc.MainfluxID, - Key: dc.MainfluxKey, - } cc := config.ChanConf{ Control: ctrlChan, Data: dataChan, } ec := config.EdgexConf{URL: ic.EdgexURL} lc := config.LogConf{Level: ic.LogLevel} - mc := config.MQTTConf{URL: ic.MqttURL} + mc := config.MQTTConf{ + URL: ic.MqttURL, + Password: dc.MainfluxKey, + Username: dc.MainfluxID, + } - c := config.New(sc, tc, cc, ec, lc, mc, file) + c := config.New(sc, cc, ec, lc, mc, file) return c.Save() } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 562c73bd..9da78c65 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -4,6 +4,7 @@ package config import ( + "crypto/tls" "fmt" "io/ioutil" @@ -15,11 +16,6 @@ type ServerConf struct { NatsURL string `toml:"nats_url"` } -type ThingConf struct { - ID string `toml:"id"` - Key string `toml:"key"` -} - type ChanConf struct { Control string `toml:"control"` Data string `toml:"data"` @@ -34,13 +30,23 @@ type LogConf struct { } type MQTTConf struct { - URL string `toml:"url"` + URL string `json:"url" toml:"url"` + Username string `json:"username" toml:"username" mapstructure:"username"` + Password string `json:"password" toml:"password" mapstructure:"password"` + MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"` + SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"` + Retain bool `json:"retain" toml:"retain" mapstructure:"retain"` + QoS int `json:"qos" toml:"qos" mapstructure:"qos"` + CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"` + CertPath string `json:"cert_path" toml:"cert_path" mapstructure:"cert_path"` + PrivKeyPath string `json:"priv_key_path" toml:"priv_key_path" mapstructure:"priv_key_path"` + CA []byte `json:"-" toml:"-"` + Cert tls.Certificate `json:"-" toml:"-"` } // Config struct of Mainflux Agent type AgentConf struct { Server ServerConf `toml:"server"` - Thing ThingConf `toml:"thing"` Channels ChanConf `toml:"channels"` Edgex EdgexConf `toml:"edgex"` Log LogConf `toml:"log"` @@ -52,10 +58,9 @@ type Config struct { File string } -func New(sc ServerConf, tc ThingConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, file string) *Config { +func New(sc ServerConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, file string) *Config { ac := AgentConf{ Server: sc, - Thing: tc, Channels: cc, Edgex: ec, Log: lc, diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/conn/conn.go similarity index 83% rename from internal/pkg/mqtt/sub.go rename to internal/pkg/conn/conn.go index 819def3d..2450a2bf 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/conn/conn.go @@ -1,7 +1,7 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 -package mqtt +package conn import ( "fmt" @@ -14,7 +14,7 @@ import ( "github.com/nats-io/go-nats" "robpike.io/filter" - paho "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" ) const ( @@ -40,13 +40,13 @@ type MqttBroker interface { type broker struct { svc agent.Service - client paho.Client + client mqtt.Client logger logger.Logger nats *nats.Conn } // NewBroker returns new MQTT broker instance. -func NewBroker(svc agent.Service, client paho.Client, nats *nats.Conn, log logger.Logger) MqttBroker { +func NewBroker(svc agent.Service, client mqtt.Client, nats *nats.Conn, log logger.Logger) MqttBroker { return &broker{ svc: svc, client: client, @@ -73,7 +73,7 @@ func (b *broker) Subscribe(topic string) error { } // handleNatsMsg triggered when new message is received on MQTT broker -func (b *broker) handleNatsMsg(mc paho.Client, msg paho.Message) { +func (b *broker) handleNatsMsg(mc mqtt.Client, msg mqtt.Message) { if topic := extractNatsTopic(msg.Topic()); topic != "" { b.nats.Publish(topic, msg.Payload()) } @@ -94,7 +94,7 @@ func extractNatsTopic(topic string) string { } // handleMsg triggered when new message is received on MQTT broker -func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { +func (b *broker) handleMsg(mc mqtt.Client, msg mqtt.Message) { sm, err := senml.Decode(msg.Payload(), senml.JSON) if err != nil { b.logger.Warn(fmt.Sprintf("SenML decode failed: %s", err)) @@ -117,14 +117,14 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } case config: - b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + b.logger.Info(fmt.Sprintf("Config service for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { - b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) + b.logger.Warn(fmt.Sprintf("Config service operation failed: %s", err)) } case service: - b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + b.logger.Info(fmt.Sprintf("Services view for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { - b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) + b.logger.Warn(fmt.Sprintf("Services view operation failed: %s", err)) } }