Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Export config from content #21

Merged
merged 28 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
build
cmd/config.toml
cmd/config.toml
170 changes: 132 additions & 38 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package main

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"

paho "github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/eclipse/paho.mqtt.golang"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/api"
"github.com/mainflux/agent/internal/pkg/bootstrap"
"github.com/mainflux/agent/internal/pkg/config"
"github.com/mainflux/agent/internal/pkg/mqtt"
"github.com/mainflux/agent/internal/pkg/conn"
"github.com/mainflux/agent/pkg/edgex"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
Expand All @@ -32,11 +35,19 @@ const (
defLogLevel = "info"
defEdgexURL = "http://localhost:48090/api/v1/"
defMqttURL = "localhost:1883"
defThingID = "2dce1d65-73b4-4020-bfe3-403d851386e7"
defThingKey = "1ff0d0f0-ea04-4fbb-83c4-c10b110bf566"
defCtrlChan = "f36c3733-95a3-481c-a314-4125e03d8993"
defDataChan = "ea353dac-0298-4fbb-9e5d-501e3699949c"
defEncryption = "false"
defMqttUsername = ""
defMqttPassword = ""
defMqttChannel = ""
defMqttSkipTLSVer = "true"
defMqttMTLS = "false"
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = false
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "config.toml"
defNatsURL = nats.DefaultURL

Expand All @@ -56,6 +67,16 @@ const (
envDataChan = "MF_AGENT_DATA_CHANNEL"
envEncryption = "MF_AGENT_ENCRYPTION"
envNatsURL = "MF_AGENT_NATS_URL"

envMqttUsername = "MF_AGENT_MQTT_USERNAME"
envMqttPassword = "MF_AGENT_MQTT_PASSWORD"
envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_AGENT_MQTT_MTLS"
envMqttCA = "MF_AGENT_MQTT_CA"
envMqttQoS = "MF_AGENT_MQTT_QOS"
envMqttRetain = "MF_AGENT_MQTT_RETAIN"
envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK"
)

func main() {
Expand All @@ -66,21 +87,28 @@ func main() {

cfg, err := loadConfig(logger)
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error()))
logger.Error(fmt.Sprintf("Failed to load config: %s", err.Error()))
os.Exit(1)
}

nc, err := nats.Connect(cfg.Agent.Server.NatsURL)
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL))
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL))
os.Exit(1)
}
defer nc.Close()

mqttClient := connectToMQTTBroker(cfg.Agent.MQTT.URL, cfg.Agent.Thing.ID, cfg.Agent.Thing.Key, logger)
mqttClient, err := connectToMQTTBroker(cfg.Agent.MQTT, logger)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
edgexClient := edgex.NewClient(cfg.Agent.Edgex.URL, logger)

svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
if err != nil {
log.Fatalf(fmt.Sprintf("Error in agent service: %s", err.Error()))
logger.Error(fmt.Sprintf("Error in agent service: %s", err.Error()))
os.Exit(1)
}

svc = api.LoggingMiddleware(svc, logger)
Expand Down Expand Up @@ -138,60 +166,126 @@ func loadConfig(logger logger.Logger) (config.Config, error) {
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envLogLevel, defLogLevel),
}
tc := config.ThingConf{
ID: mainflux.Env(envThingID, defThingID),
Key: mainflux.Env(envThingKey, defThingKey),
}
cc := config.ChanConf{
Control: mainflux.Env(envCtrlChan, defCtrlChan),
Data: mainflux.Env(envDataChan, defDataChan),
}
ec := config.EdgexConf{URL: mainflux.Env(envEdgexURL, defEdgexURL)}
lc := config.LogConf{Level: mainflux.Env(envLogLevel, defLogLevel)}
mc := config.MQTTConf{URL: mainflux.Env(envMqttURL, defMqttURL)}

c := config.New(sc, tc, cc, ec, lc, mc, file)
mc := config.MQTTConf{
URL: mainflux.Env(envMqttURL, defMqttURL),
Username: mainflux.Env(envMqttUsername, defMqttUsername),
Password: mainflux.Env(envMqttPassword, defMqttPassword),
}

c := config.New(sc, cc, ec, lc, mc, file)
if err := c.Read(); err != nil {
logger.Error(fmt.Sprintf("Failed to read config: %s", err))
return config.Config{}, err
}

mc, err := loadCertificate(c.Agent.MQTT)
if err != nil {
logger.Error(fmt.Sprintf("Failed to set up mtls certs %s", err))
}
c.Agent.MQTT = mc

return *c, nil
}

func connectToMQTTBroker(mqttURL, thingID, thingKey string, logger logger.Logger) paho.Client {
clientID := fmt.Sprintf("agent-%s", thingID)
opts := paho.NewClientOptions()
opts.AddBroker(mqttURL)
opts.SetClientID(clientID)
opts.SetUsername(thingID)
opts.SetPassword(thingKey)
opts.SetCleanSession(true)
opts.SetAutoReconnect(true)
opts.SetOnConnectHandler(func(c paho.Client) {
logger.Info("Connected to MQTT broker")
})
opts.SetConnectionLostHandler(func(c paho.Client, err error) {
logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error()))
os.Exit(1)
})
func connectToMQTTBroker(conf config.MQTTConf, logger logger.Logger) (mqtt.Client, error) {
name := fmt.Sprintf("agent-%s", conf.Username)
conn := func(client mqtt.Client) {
logger.Info(fmt.Sprintf("Client %s connected", name))
}

client := paho.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Error(fmt.Sprintf("Failed to connect to MQTT broker: %s", token.Error()))
os.Exit(1)
lost := func(client mqtt.Client, err error) {
logger.Info(fmt.Sprintf("Client %s disconnected", name))
}

opts := mqtt.NewClientOptions().
AddBroker(conf.URL).
SetClientID(name).
SetCleanSession(true).
SetAutoReconnect(true).
SetOnConnectHandler(conn).
SetConnectionLostHandler(lost)

if conf.Username != "" && conf.Password != "" {
opts.SetUsername(conf.Username)
opts.SetPassword(conf.Password)
}

return client
if conf.MTLS {
cfg := &tls.Config{
InsecureSkipVerify: conf.SkipTLSVer,
}

if conf.CA != nil {
cfg.RootCAs = x509.NewCertPool()
cfg.RootCAs.AppendCertsFromPEM(conf.CA)
}
if conf.Cert.Certificate != nil {
cfg.Certificates = []tls.Certificate{conf.Cert}
}

cfg.BuildNameToCertificate()
opts.SetTLSConfig(cfg)
opts.SetProtocolVersion(4)
}
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()

if token.Error() != nil {
return nil, token.Error()
}
return client, nil
}

func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) {
broker := mqtt.NewBroker(svc, mc, nc, logger)
func subscribeToMQTTBroker(svc agent.Service, mc mqtt.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) {
broker := conn.NewBroker(svc, mc, nc, logger)
topic := fmt.Sprintf("channels/%s/messages", ctrlChan)
if err := broker.Subscribe(topic); err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error()))
os.Exit(1)
}
logger.Info("Subscribed to MQTT broker")
}

func loadCertificate(cfg config.MQTTConf) (config.MQTTConf, error) {
c := cfg
caByte := []byte{}
cert := tls.Certificate{}
if !cfg.MTLS {
return c, nil
}
caFile, err := os.Open(cfg.CAPath)
defer caFile.Close()
if err != nil {
return c, err
}
caByte, err = ioutil.ReadAll(caFile)
if err != nil {
return c, err
}
clientCert, err := os.Open(cfg.CertPath)
defer clientCert.Close()
if err != nil {
return c, err
}
cc, _ := ioutil.ReadAll(clientCert)
privKey, err := os.Open(cfg.PrivKeyPath)
defer clientCert.Close()
if err != nil {
return c, err
}
pk, _ := ioutil.ReadAll((privKey))
cert, err = tls.X509KeyPair([]byte(cc), []byte(pk))
if err != nil {
return c, err
}
cfg.Cert = cert
cfg.CA = caByte
return c, nil
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
17 changes: 10 additions & 7 deletions internal/app/agent/api/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ type serverConf struct {
port string `json:"port"`
}

type thingConf struct {
id string `json:"id"`
key string `json:"key"`
}

type chanConf struct {
control string `json:"control"`
data string `json:"data"`
Expand All @@ -26,13 +21,21 @@ type logConf struct {
}

type mqttConf struct {
url string `json:"url"`
url string `json:"url"`
username string `json:"username"`
password string `json:"json"`
mtls bool `json:"mtls"`
skipTLSVer bool `json:"skip_tls_ver"`
retain bool `json:"retain"`
QoS int `json:"qos"`
caPath string `json:"ca_path"`
certPath string `json:"cert_path"`
privKeyPath string `json:"priv_key_path"`
}

// Config struct of Mainflux Agent
type agentConf struct {
server serverConf `json:"server"`
thing thingConf `json:"thing"`
channels chanConf `json:"channels"`
edgex edgexConf `json:"edgex"`
log logConf `json:"log"`
Expand Down
36 changes: 6 additions & 30 deletions internal/app/agent/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint {
Control: req.agent.channels.control,
Data: req.agent.channels.data,
}
tc := config.ThingConf{
ID: req.agent.thing.id,
Key: req.agent.thing.key,
}
ec := config.EdgexConf{URL: req.agent.edgex.url}
lc := config.LogConf{Level: req.agent.log.level}
mc := config.MQTTConf{URL: req.agent.mqtt.url}
mc := config.MQTTConf{
URL: req.agent.mqtt.url,
Username: req.agent.mqtt.username,
Password: req.agent.mqtt.password,
}
a := config.AgentConf{
Server: sc,
Thing: tc,
Channels: cc,
Edgex: ec,
Log: lc,
Expand All @@ -100,30 +99,7 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint {
func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (interface{}, error) {
c := svc.Config()

sc := serverConf{port: c.Agent.Server.Port}
cc := chanConf{
control: c.Agent.Channels.Control,
data: c.Agent.Channels.Data,
}
tc := thingConf{
id: c.Agent.Thing.ID,
key: c.Agent.Thing.Key,
}
ec := edgexConf{url: c.Agent.Edgex.URL}
lc := logConf{level: c.Agent.Log.Level}
mc := mqttConf{url: c.Agent.MQTT.URL}
a := agentConf{
server: sc,
thing: tc,
channels: cc,
edgex: ec,
log: lc,
mqtt: mc,
}
res := configRes{agent: a}

return res, nil
return c, nil
}
}

Expand Down
3 changes: 1 addition & 2 deletions internal/app/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
log "github.com/mainflux/mainflux/logger"
)
Expand Down Expand Up @@ -99,7 +98,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) {
return lm.svc.ServiceConfig(uuid, cmdStr)
}

func (lm loggingMiddleware) Services() map[string]*services.Service {
func (lm loggingMiddleware) Services() []agent.ServiceInfo {
defer func(begin time.Time) {
message := fmt.Sprintf("Method services took %s to complete", time.Since(begin))
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
Expand Down
3 changes: 1 addition & 2 deletions internal/app/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/go-kit/kit/metrics"
"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
)

Expand Down Expand Up @@ -76,7 +75,7 @@ func (ms *metricsMiddleware) Config() config.Config {
return ms.svc.Config()
}

func (ms *metricsMiddleware) Services() map[string]*services.Service {
func (ms *metricsMiddleware) Services() []agent.ServiceInfo {
defer func(begin time.Time) {
ms.counter.With("method", "services").Add(1)
ms.latency.With("method", "services").Observe(time.Since(begin).Seconds())
Expand Down
Loading