Skip to content

Commit

Permalink
Enable mtls communication with mqtt broker (#15)
Browse files Browse the repository at this point in the history
* enable mtls

Signed-off-by: Mirko Teodorovic <[email protected]>

* enable mtls

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix config endpoint

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix double broker connection

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix disconnect problem

Signed-off-by: Mirko Teodorovic <[email protected]>

* killing the white lines

Signed-off-by: Mirko Teodorovic <[email protected]>

* small corrections

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove some types and typos

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix logger usage

Signed-off-by: Mirko Teodorovic <[email protected]>

* mqtt is moved to conn

Signed-off-by: Mirko Teodorovic <[email protected]>

* resolve comments

Signed-off-by: Mirko Teodorovic <[email protected]>

* resolve comments

Signed-off-by: Mirko Teodorovic <[email protected]>

* kill whit line

Signed-off-by: Mirko Teodorovic <[email protected]>
  • Loading branch information
mteodor authored Feb 7, 2020
1 parent 22484cb commit c8bcea3
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 107 deletions.
170 changes: 132 additions & 38 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package main

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"

paho "github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/eclipse/paho.mqtt.golang"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/api"
"github.com/mainflux/agent/internal/pkg/bootstrap"
"github.com/mainflux/agent/internal/pkg/config"
"github.com/mainflux/agent/internal/pkg/mqtt"
"github.com/mainflux/agent/internal/pkg/conn"
"github.com/mainflux/agent/pkg/edgex"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
Expand All @@ -32,11 +35,19 @@ const (
defLogLevel = "info"
defEdgexURL = "http://localhost:48090/api/v1/"
defMqttURL = "localhost:1883"
defThingID = "2dce1d65-73b4-4020-bfe3-403d851386e7"
defThingKey = "1ff0d0f0-ea04-4fbb-83c4-c10b110bf566"
defCtrlChan = "f36c3733-95a3-481c-a314-4125e03d8993"
defDataChan = "ea353dac-0298-4fbb-9e5d-501e3699949c"
defEncryption = "false"
defMqttUsername = ""
defMqttPassword = ""
defMqttChannel = ""
defMqttSkipTLSVer = "true"
defMqttMTLS = "false"
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = false
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "config.toml"
defNatsURL = nats.DefaultURL

Expand All @@ -56,6 +67,16 @@ const (
envDataChan = "MF_AGENT_DATA_CHANNEL"
envEncryption = "MF_AGENT_ENCRYPTION"
envNatsURL = "MF_AGENT_NATS_URL"

envMqttUsername = "MF_AGENT_MQTT_USERNAME"
envMqttPassword = "MF_AGENT_MQTT_PASSWORD"
envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_AGENT_MQTT_MTLS"
envMqttCA = "MF_AGENT_MQTT_CA"
envMqttQoS = "MF_AGENT_MQTT_QOS"
envMqttRetain = "MF_AGENT_MQTT_RETAIN"
envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK"
)

func main() {
Expand All @@ -66,21 +87,28 @@ func main() {

cfg, err := loadConfig(logger)
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error()))
logger.Error(fmt.Sprintf("Failed to load config: %s", err.Error()))
os.Exit(1)
}

nc, err := nats.Connect(cfg.Agent.Server.NatsURL)
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL))
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL))
os.Exit(1)
}
defer nc.Close()

mqttClient := connectToMQTTBroker(cfg.Agent.MQTT.URL, cfg.Agent.Thing.ID, cfg.Agent.Thing.Key, logger)
mqttClient, err := connectToMQTTBroker(cfg.Agent.MQTT, logger)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
edgexClient := edgex.NewClient(cfg.Agent.Edgex.URL, logger)

svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
if err != nil {
log.Fatalf(fmt.Sprintf("Error in agent service: %s", err.Error()))
logger.Error(fmt.Sprintf("Error in agent service: %s", err.Error()))
os.Exit(1)
}

svc = api.LoggingMiddleware(svc, logger)
Expand Down Expand Up @@ -138,60 +166,126 @@ func loadConfig(logger logger.Logger) (config.Config, error) {
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envLogLevel, defLogLevel),
}
tc := config.ThingConf{
ID: mainflux.Env(envThingID, defThingID),
Key: mainflux.Env(envThingKey, defThingKey),
}
cc := config.ChanConf{
Control: mainflux.Env(envCtrlChan, defCtrlChan),
Data: mainflux.Env(envDataChan, defDataChan),
}
ec := config.EdgexConf{URL: mainflux.Env(envEdgexURL, defEdgexURL)}
lc := config.LogConf{Level: mainflux.Env(envLogLevel, defLogLevel)}
mc := config.MQTTConf{URL: mainflux.Env(envMqttURL, defMqttURL)}

c := config.New(sc, tc, cc, ec, lc, mc, file)
mc := config.MQTTConf{
URL: mainflux.Env(envMqttURL, defMqttURL),
Username: mainflux.Env(envMqttUsername, defMqttUsername),
Password: mainflux.Env(envMqttPassword, defMqttPassword),
}

c := config.New(sc, cc, ec, lc, mc, file)
if err := c.Read(); err != nil {
logger.Error(fmt.Sprintf("Failed to read config: %s", err))
return config.Config{}, err
}

mc, err := loadCertificate(c.Agent.MQTT)
if err != nil {
logger.Error(fmt.Sprintf("Failed to set up mtls certs %s", err))
}
c.Agent.MQTT = mc

return *c, nil
}

func connectToMQTTBroker(mqttURL, thingID, thingKey string, logger logger.Logger) paho.Client {
clientID := fmt.Sprintf("agent-%s", thingID)
opts := paho.NewClientOptions()
opts.AddBroker(mqttURL)
opts.SetClientID(clientID)
opts.SetUsername(thingID)
opts.SetPassword(thingKey)
opts.SetCleanSession(true)
opts.SetAutoReconnect(true)
opts.SetOnConnectHandler(func(c paho.Client) {
logger.Info("Connected to MQTT broker")
})
opts.SetConnectionLostHandler(func(c paho.Client, err error) {
logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error()))
os.Exit(1)
})
func connectToMQTTBroker(conf config.MQTTConf, logger logger.Logger) (mqtt.Client, error) {
name := fmt.Sprintf("agent-%s", conf.Username)
conn := func(client mqtt.Client) {
logger.Info(fmt.Sprintf("Client %s connected", name))
}

client := paho.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Error(fmt.Sprintf("Failed to connect to MQTT broker: %s", token.Error()))
os.Exit(1)
lost := func(client mqtt.Client, err error) {
logger.Info(fmt.Sprintf("Client %s disconnected", name))
}

opts := mqtt.NewClientOptions().
AddBroker(conf.URL).
SetClientID(name).
SetCleanSession(true).
SetAutoReconnect(true).
SetOnConnectHandler(conn).
SetConnectionLostHandler(lost)

if conf.Username != "" && conf.Password != "" {
opts.SetUsername(conf.Username)
opts.SetPassword(conf.Password)
}

return client
if conf.MTLS {
cfg := &tls.Config{
InsecureSkipVerify: conf.SkipTLSVer,
}

if conf.CA != nil {
cfg.RootCAs = x509.NewCertPool()
cfg.RootCAs.AppendCertsFromPEM(conf.CA)
}
if conf.Cert.Certificate != nil {
cfg.Certificates = []tls.Certificate{conf.Cert}
}

cfg.BuildNameToCertificate()
opts.SetTLSConfig(cfg)
opts.SetProtocolVersion(4)
}
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()

if token.Error() != nil {
return nil, token.Error()
}
return client, nil
}

func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) {
broker := mqtt.NewBroker(svc, mc, nc, logger)
func subscribeToMQTTBroker(svc agent.Service, mc mqtt.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) {
broker := conn.NewBroker(svc, mc, nc, logger)
topic := fmt.Sprintf("channels/%s/messages", ctrlChan)
if err := broker.Subscribe(topic); err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error()))
os.Exit(1)
}
logger.Info("Subscribed to MQTT broker")
}

func loadCertificate(cfg config.MQTTConf) (config.MQTTConf, error) {
c := cfg
caByte := []byte{}
cert := tls.Certificate{}
if !cfg.MTLS {
return c, nil
}
caFile, err := os.Open(cfg.CAPath)
defer caFile.Close()
if err != nil {
return c, err
}
caByte, err = ioutil.ReadAll(caFile)
if err != nil {
return c, err
}
clientCert, err := os.Open(cfg.CertPath)
defer clientCert.Close()
if err != nil {
return c, err
}
cc, _ := ioutil.ReadAll(clientCert)
privKey, err := os.Open(cfg.PrivKeyPath)
defer clientCert.Close()
if err != nil {
return c, err
}
pk, _ := ioutil.ReadAll((privKey))
cert, err = tls.X509KeyPair([]byte(cc), []byte(pk))
if err != nil {
return c, err
}
cfg.Cert = cert
cfg.CA = caByte
return c, nil
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
17 changes: 10 additions & 7 deletions internal/app/agent/api/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ type serverConf struct {
port string `json:"port"`
}

type thingConf struct {
id string `json:"id"`
key string `json:"key"`
}

type chanConf struct {
control string `json:"control"`
data string `json:"data"`
Expand All @@ -26,13 +21,21 @@ type logConf struct {
}

type mqttConf struct {
url string `json:"url"`
url string `json:"url"`
username string `json:"username"`
password string `json:"json"`
mtls bool `json:"mtls"`
skipTLSVer bool `json:"skip_tls_ver"`
retain bool `json:"retain"`
QoS int `json:"qos"`
caPath string `json:"ca_path"`
certPath string `json:"cert_path"`
privKeyPath string `json:"priv_key_path"`
}

// Config struct of Mainflux Agent
type agentConf struct {
server serverConf `json:"server"`
thing thingConf `json:"thing"`
channels chanConf `json:"channels"`
edgex edgexConf `json:"edgex"`
log logConf `json:"log"`
Expand Down
36 changes: 6 additions & 30 deletions internal/app/agent/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint {
Control: req.agent.channels.control,
Data: req.agent.channels.data,
}
tc := config.ThingConf{
ID: req.agent.thing.id,
Key: req.agent.thing.key,
}
ec := config.EdgexConf{URL: req.agent.edgex.url}
lc := config.LogConf{Level: req.agent.log.level}
mc := config.MQTTConf{URL: req.agent.mqtt.url}
mc := config.MQTTConf{
URL: req.agent.mqtt.url,
Username: req.agent.mqtt.username,
Password: req.agent.mqtt.password,
}
a := config.AgentConf{
Server: sc,
Thing: tc,
Channels: cc,
Edgex: ec,
Log: lc,
Expand All @@ -100,30 +99,7 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint {
func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (interface{}, error) {
c := svc.Config()

sc := serverConf{port: c.Agent.Server.Port}
cc := chanConf{
control: c.Agent.Channels.Control,
data: c.Agent.Channels.Data,
}
tc := thingConf{
id: c.Agent.Thing.ID,
key: c.Agent.Thing.Key,
}
ec := edgexConf{url: c.Agent.Edgex.URL}
lc := logConf{level: c.Agent.Log.Level}
mc := mqttConf{url: c.Agent.MQTT.URL}
a := agentConf{
server: sc,
thing: tc,
channels: cc,
edgex: ec,
log: lc,
mqtt: mc,
}
res := configRes{agent: a}

return res, nil
return c, nil
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/app/agent/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type addConfigReq struct {

func (req addConfigReq) validate() error {
if req.agent.server.port == "" ||
req.agent.thing.id == "" ||
req.agent.thing.key == "" ||
req.agent.mqtt.username == "" ||
req.agent.mqtt.password == "" ||
req.agent.channels.control == "" ||
req.agent.channels.data == "" ||
req.agent.log.level == "" ||
Expand Down
5 changes: 0 additions & 5 deletions internal/app/agent/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,3 @@ type execRes struct {
Name string `json:"n"`
Value string `json:"vs"`
}

type configRes struct {
agent agentConf
file string
}
12 changes: 6 additions & 6 deletions internal/pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,19 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error {
NatsURL: ic.NatsURL,
}

tc := config.ThingConf{
ID: dc.MainfluxID,
Key: dc.MainfluxKey,
}
cc := config.ChanConf{
Control: ctrlChan,
Data: dataChan,
}
ec := config.EdgexConf{URL: ic.EdgexURL}
lc := config.LogConf{Level: ic.LogLevel}
mc := config.MQTTConf{URL: ic.MqttURL}
mc := config.MQTTConf{
URL: ic.MqttURL,
Password: dc.MainfluxKey,
Username: dc.MainfluxID,
}

c := config.New(sc, tc, cc, ec, lc, mc, file)
c := config.New(sc, cc, ec, lc, mc, file)

return c.Save()
}
Expand Down
Loading

0 comments on commit c8bcea3

Please sign in to comment.