diff --git a/.gitignore b/.gitignore index 6565ee3d..2b1c8246 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ build -cmd/config.toml \ No newline at end of file +cmd/config.toml diff --git a/cmd/main.go b/cmd/main.go index 2864c472..e035f476 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,20 +1,23 @@ package main import ( + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "log" "net/http" "os" "os/signal" "syscall" - paho "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/agent/internal/app/agent" "github.com/mainflux/agent/internal/app/agent/api" "github.com/mainflux/agent/internal/pkg/bootstrap" "github.com/mainflux/agent/internal/pkg/config" - "github.com/mainflux/agent/internal/pkg/mqtt" + "github.com/mainflux/agent/internal/pkg/conn" "github.com/mainflux/agent/pkg/edgex" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" @@ -32,11 +35,19 @@ const ( defLogLevel = "info" defEdgexURL = "http://localhost:48090/api/v1/" defMqttURL = "localhost:1883" - defThingID = "2dce1d65-73b4-4020-bfe3-403d851386e7" - defThingKey = "1ff0d0f0-ea04-4fbb-83c4-c10b110bf566" defCtrlChan = "f36c3733-95a3-481c-a314-4125e03d8993" defDataChan = "ea353dac-0298-4fbb-9e5d-501e3699949c" defEncryption = "false" + defMqttUsername = "" + defMqttPassword = "" + defMqttChannel = "" + defMqttSkipTLSVer = "true" + defMqttMTLS = "false" + defMqttCA = "ca.crt" + defMqttQoS = "0" + defMqttRetain = false + defMqttCert = "thing.cert" + defMqttPrivKey = "thing.key" defConfigFile = "config.toml" defNatsURL = nats.DefaultURL @@ -56,6 +67,16 @@ const ( envDataChan = "MF_AGENT_DATA_CHANNEL" envEncryption = "MF_AGENT_ENCRYPTION" envNatsURL = "MF_AGENT_NATS_URL" + + envMqttUsername = "MF_AGENT_MQTT_USERNAME" + envMqttPassword = "MF_AGENT_MQTT_PASSWORD" + envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS" + envMqttMTLS = "MF_AGENT_MQTT_MTLS" + envMqttCA = "MF_AGENT_MQTT_CA" + envMqttQoS = "MF_AGENT_MQTT_QOS" + envMqttRetain = "MF_AGENT_MQTT_RETAIN" + envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT" + envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK" ) func main() { @@ -66,21 +87,28 @@ func main() { cfg, err := loadConfig(logger) if err != nil { - log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error())) + logger.Error(fmt.Sprintf("Failed to load config: %s", err.Error())) + os.Exit(1) } nc, err := nats.Connect(cfg.Agent.Server.NatsURL) if err != nil { - log.Fatalf(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL)) + logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL)) + os.Exit(1) } defer nc.Close() - mqttClient := connectToMQTTBroker(cfg.Agent.MQTT.URL, cfg.Agent.Thing.ID, cfg.Agent.Thing.Key, logger) + mqttClient, err := connectToMQTTBroker(cfg.Agent.MQTT, logger) + if err != nil { + logger.Error(err.Error()) + os.Exit(1) + } edgexClient := edgex.NewClient(cfg.Agent.Edgex.URL, logger) svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger) if err != nil { - log.Fatalf(fmt.Sprintf("Error in agent service: %s", err.Error())) + logger.Error(fmt.Sprintf("Error in agent service: %s", err.Error())) + os.Exit(1) } svc = api.LoggingMiddleware(svc, logger) @@ -138,56 +166,85 @@ func loadConfig(logger logger.Logger) (config.Config, error) { NatsURL: mainflux.Env(envNatsURL, defNatsURL), Port: mainflux.Env(envLogLevel, defLogLevel), } - tc := config.ThingConf{ - ID: mainflux.Env(envThingID, defThingID), - Key: mainflux.Env(envThingKey, defThingKey), - } cc := config.ChanConf{ Control: mainflux.Env(envCtrlChan, defCtrlChan), Data: mainflux.Env(envDataChan, defDataChan), } ec := config.EdgexConf{URL: mainflux.Env(envEdgexURL, defEdgexURL)} lc := config.LogConf{Level: mainflux.Env(envLogLevel, defLogLevel)} - mc := config.MQTTConf{URL: mainflux.Env(envMqttURL, defMqttURL)} - - c := config.New(sc, tc, cc, ec, lc, mc, file) + mc := config.MQTTConf{ + URL: mainflux.Env(envMqttURL, defMqttURL), + Username: mainflux.Env(envMqttUsername, defMqttUsername), + Password: mainflux.Env(envMqttPassword, defMqttPassword), + } + c := config.New(sc, cc, ec, lc, mc, file) if err := c.Read(); err != nil { logger.Error(fmt.Sprintf("Failed to read config: %s", err)) return config.Config{}, err } + mc, err := loadCertificate(c.Agent.MQTT) + if err != nil { + logger.Error(fmt.Sprintf("Failed to set up mtls certs %s", err)) + } + c.Agent.MQTT = mc + return *c, nil } -func connectToMQTTBroker(mqttURL, thingID, thingKey string, logger logger.Logger) paho.Client { - clientID := fmt.Sprintf("agent-%s", thingID) - opts := paho.NewClientOptions() - opts.AddBroker(mqttURL) - opts.SetClientID(clientID) - opts.SetUsername(thingID) - opts.SetPassword(thingKey) - opts.SetCleanSession(true) - opts.SetAutoReconnect(true) - opts.SetOnConnectHandler(func(c paho.Client) { - logger.Info("Connected to MQTT broker") - }) - opts.SetConnectionLostHandler(func(c paho.Client, err error) { - logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error())) - os.Exit(1) - }) +func connectToMQTTBroker(conf config.MQTTConf, logger logger.Logger) (mqtt.Client, error) { + name := fmt.Sprintf("agent-%s", conf.Username) + conn := func(client mqtt.Client) { + logger.Info(fmt.Sprintf("Client %s connected", name)) + } - client := paho.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - logger.Error(fmt.Sprintf("Failed to connect to MQTT broker: %s", token.Error())) - os.Exit(1) + lost := func(client mqtt.Client, err error) { + logger.Info(fmt.Sprintf("Client %s disconnected", name)) + } + + opts := mqtt.NewClientOptions(). + AddBroker(conf.URL). + SetClientID(name). + SetCleanSession(true). + SetAutoReconnect(true). + SetOnConnectHandler(conn). + SetConnectionLostHandler(lost) + + if conf.Username != "" && conf.Password != "" { + opts.SetUsername(conf.Username) + opts.SetPassword(conf.Password) } - return client + if conf.MTLS { + cfg := &tls.Config{ + InsecureSkipVerify: conf.SkipTLSVer, + } + + if conf.CA != nil { + cfg.RootCAs = x509.NewCertPool() + cfg.RootCAs.AppendCertsFromPEM(conf.CA) + } + if conf.Cert.Certificate != nil { + cfg.Certificates = []tls.Certificate{conf.Cert} + } + + cfg.BuildNameToCertificate() + opts.SetTLSConfig(cfg) + opts.SetProtocolVersion(4) + } + client := mqtt.NewClient(opts) + token := client.Connect() + token.Wait() + + if token.Error() != nil { + return nil, token.Error() + } + return client, nil } -func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) { - broker := mqtt.NewBroker(svc, mc, nc, logger) +func subscribeToMQTTBroker(svc agent.Service, mc mqtt.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) { + broker := conn.NewBroker(svc, mc, nc, logger) topic := fmt.Sprintf("channels/%s/messages", ctrlChan) if err := broker.Subscribe(topic); err != nil { logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error())) @@ -195,3 +252,40 @@ func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, n } logger.Info("Subscribed to MQTT broker") } + +func loadCertificate(cfg config.MQTTConf) (config.MQTTConf, error) { + c := cfg + caByte := []byte{} + cert := tls.Certificate{} + if !cfg.MTLS { + return c, nil + } + caFile, err := os.Open(cfg.CAPath) + defer caFile.Close() + if err != nil { + return c, err + } + caByte, err = ioutil.ReadAll(caFile) + if err != nil { + return c, err + } + clientCert, err := os.Open(cfg.CertPath) + defer clientCert.Close() + if err != nil { + return c, err + } + cc, _ := ioutil.ReadAll(clientCert) + privKey, err := os.Open(cfg.PrivKeyPath) + defer clientCert.Close() + if err != nil { + return c, err + } + pk, _ := ioutil.ReadAll((privKey)) + cert, err = tls.X509KeyPair([]byte(cc), []byte(pk)) + if err != nil { + return c, err + } + cfg.Cert = cert + cfg.CA = caByte + return c, nil +} diff --git a/go.sum b/go.sum index b35f7204..1ad12d15 100644 --- a/go.sum +++ b/go.sum @@ -250,6 +250,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/internal/app/agent/api/common.go b/internal/app/agent/api/common.go index 914deebf..c56c1720 100644 --- a/internal/app/agent/api/common.go +++ b/internal/app/agent/api/common.go @@ -7,11 +7,6 @@ type serverConf struct { port string `json:"port"` } -type thingConf struct { - id string `json:"id"` - key string `json:"key"` -} - type chanConf struct { control string `json:"control"` data string `json:"data"` @@ -26,13 +21,21 @@ type logConf struct { } type mqttConf struct { - url string `json:"url"` + url string `json:"url"` + username string `json:"username"` + password string `json:"json"` + mtls bool `json:"mtls"` + skipTLSVer bool `json:"skip_tls_ver"` + retain bool `json:"retain"` + QoS int `json:"qos"` + caPath string `json:"ca_path"` + certPath string `json:"cert_path"` + privKeyPath string `json:"priv_key_path"` } // Config struct of Mainflux Agent type agentConf struct { server serverConf `json:"server"` - thing thingConf `json:"thing"` channels chanConf `json:"channels"` edgex edgexConf `json:"edgex"` log logConf `json:"log"` diff --git a/internal/app/agent/api/endpoints.go b/internal/app/agent/api/endpoints.go index 95bdc936..968aed17 100644 --- a/internal/app/agent/api/endpoints.go +++ b/internal/app/agent/api/endpoints.go @@ -70,16 +70,15 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint { Control: req.agent.channels.control, Data: req.agent.channels.data, } - tc := config.ThingConf{ - ID: req.agent.thing.id, - Key: req.agent.thing.key, - } ec := config.EdgexConf{URL: req.agent.edgex.url} lc := config.LogConf{Level: req.agent.log.level} - mc := config.MQTTConf{URL: req.agent.mqtt.url} + mc := config.MQTTConf{ + URL: req.agent.mqtt.url, + Username: req.agent.mqtt.username, + Password: req.agent.mqtt.password, + } a := config.AgentConf{ Server: sc, - Thing: tc, Channels: cc, Edgex: ec, Log: lc, @@ -100,30 +99,7 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint { func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint { return func(_ context.Context, request interface{}) (interface{}, error) { c := svc.Config() - - sc := serverConf{port: c.Agent.Server.Port} - cc := chanConf{ - control: c.Agent.Channels.Control, - data: c.Agent.Channels.Data, - } - tc := thingConf{ - id: c.Agent.Thing.ID, - key: c.Agent.Thing.Key, - } - ec := edgexConf{url: c.Agent.Edgex.URL} - lc := logConf{level: c.Agent.Log.Level} - mc := mqttConf{url: c.Agent.MQTT.URL} - a := agentConf{ - server: sc, - thing: tc, - channels: cc, - edgex: ec, - log: lc, - mqtt: mc, - } - res := configRes{agent: a} - - return res, nil + return c, nil } } diff --git a/internal/app/agent/api/logging.go b/internal/app/agent/api/logging.go index e552aab5..60547798 100644 --- a/internal/app/agent/api/logging.go +++ b/internal/app/agent/api/logging.go @@ -8,7 +8,6 @@ import ( "time" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" log "github.com/mainflux/mainflux/logger" ) @@ -99,7 +98,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) { return lm.svc.ServiceConfig(uuid, cmdStr) } -func (lm loggingMiddleware) Services() map[string]*services.Service { +func (lm loggingMiddleware) Services() []agent.ServiceInfo { defer func(begin time.Time) { message := fmt.Sprintf("Method services took %s to complete", time.Since(begin)) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) diff --git a/internal/app/agent/api/metrics.go b/internal/app/agent/api/metrics.go index f6414305..9a361feb 100644 --- a/internal/app/agent/api/metrics.go +++ b/internal/app/agent/api/metrics.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/metrics" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" ) @@ -76,7 +75,7 @@ func (ms *metricsMiddleware) Config() config.Config { return ms.svc.Config() } -func (ms *metricsMiddleware) Services() map[string]*services.Service { +func (ms *metricsMiddleware) Services() []agent.ServiceInfo { defer func(begin time.Time) { ms.counter.With("method", "services").Add(1) ms.latency.With("method", "services").Observe(time.Since(begin).Seconds()) diff --git a/internal/app/agent/api/requests.go b/internal/app/agent/api/requests.go index fa2c4daa..81e5f2d6 100644 --- a/internal/app/agent/api/requests.go +++ b/internal/app/agent/api/requests.go @@ -41,8 +41,8 @@ type addConfigReq struct { func (req addConfigReq) validate() error { if req.agent.server.port == "" || - req.agent.thing.id == "" || - req.agent.thing.key == "" || + req.agent.mqtt.username == "" || + req.agent.mqtt.password == "" || req.agent.channels.control == "" || req.agent.channels.data == "" || req.agent.log.level == "" || diff --git a/internal/app/agent/api/responses.go b/internal/app/agent/api/responses.go index d6146907..6bf4ae41 100644 --- a/internal/app/agent/api/responses.go +++ b/internal/app/agent/api/responses.go @@ -13,8 +13,3 @@ type execRes struct { Name string `json:"n"` Value string `json:"vs"` } - -type configRes struct { - agent agentConf - file string -} diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 3fad9dc8..41710fe5 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -5,15 +5,18 @@ package agent import ( "encoding/base64" + "encoding/json" "fmt" "os/exec" + "sort" "strings" + "time" paho "github.com/eclipse/paho.mqtt.golang" "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" "github.com/mainflux/agent/pkg/edgex" - export "github.com/mainflux/export/pkg/config" + exp "github.com/mainflux/export/pkg/config" "github.com/mainflux/mainflux/errors" log "github.com/mainflux/mainflux/logger" "github.com/mainflux/senml" @@ -25,6 +28,11 @@ const ( Hearbeat = "heartbeat.*" Commands = "commands" Config = "config" + + view = "view" + save = "save" + + export = "export" ) var ( @@ -42,6 +50,9 @@ var ( // errNatsSubscribing indicates problem with sub to topic for heartbeat errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic") + + // errNoSuchService indicates service not supported + errNoSuchService = errors.New("no such service") ) // Service specifies API for publishing messages and subscribing to topics. @@ -62,7 +73,7 @@ type Service interface { ServiceConfig(uuid, cmdStr string) error // Services returns service list - Services() map[string]*services.Service + Services() []ServiceInfo // Publish message Publish(string, string) error @@ -70,13 +81,19 @@ type Service interface { var _ Service = (*agent)(nil) +type ServiceInfo struct { + Name string + LastSeen time.Time + Status string +} + type agent struct { mqttClient paho.Client config *config.Config edgexClient edgex.Client logger log.Logger nats *nats.Conn - servs map[string]*services.Service + svcs map[string]*services.Service } // New returns agent service implementation. @@ -87,7 +104,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log config: cfg, nats: nc, logger: logger, - servs: make(map[string]*services.Service), + svcs: make(map[string]*services.Service), } _, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) { @@ -97,16 +114,16 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub)) return } - servname := tok[1] + svcname := tok[1] // Service name is extracted from the subtopic // if there is multiple instances of the same service // we will have to add another distinction - if _, ok := ag.servs[servname]; !ok { - serv := services.NewService(servname) - ag.servs[servname] = serv - ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname)) + if _, ok := ag.svcs[svcname]; !ok { + svc := services.NewService(svcname) + ag.svcs[svcname] = svc + ag.logger.Info(fmt.Sprintf("Services '%s' registered", svcname)) } - serv := ag.servs[servname] + serv := ag.svcs[svcname] serv.Update() }) @@ -168,41 +185,75 @@ func (a *agent) Control(uuid, cmdStr string) error { return err } - payload, err := encodeSenML(uuid, cmd, resp) - if err != nil { - return err - } - - if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { - return err - } - - return nil + return a.processResponse(uuid, cmd, resp) } // Message for this command -// "[{"bn":"1:", "n":"config", "vs":"export, /configs/export/config.toml, config_file_content"}]" +// [{"bn":"1:", "n":"services", "vs":"view"}] +// [{"bn":"1:", "n":"config", "vs":"save, export, filename, filecontent"}] // config_file_content is base64 encoded marshaled structure representing service conf // Example of creation: // b, _ := toml.Marshal(cfg) // config_file_content := base64.StdEncoding.EncodeToString(b) func (a *agent) ServiceConfig(uuid, cmdStr string) error { cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",") - if len(cmdArgs) < 3 { + if len(cmdArgs) < 1 { return errInvalidCommand } + resp := "" + cmd := cmdArgs[0] + switch cmd { + case view: + services, err := json.Marshal(a.Services()) + if err != nil { + return err + } + resp = string(services) + case save: + if len(cmdArgs) < 4 { + return errInvalidCommand + } + service := cmdArgs[1] + fileName := cmdArgs[2] + fileCont := cmdArgs[3] + if err := a.saveConfig(service, fileName, fileCont); err != nil { + return err + } + } + return a.processResponse(uuid, cmd, resp) +} - service := cmdArgs[0] - fileName := cmdArgs[1] - fileCont, err := base64.StdEncoding.DecodeString(cmdArgs[2]) +func (a *agent) processResponse(uuid, cmd, resp string) error { + payload, err := encodeSenML(uuid, cmd, resp) if err != nil { return err } - c := &export.Config{} + if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { + return err + } + return nil +} + +func (a *agent) saveConfig(service, fileName, fileCont string) error { + switch service { + case export: + content, err := base64.StdEncoding.DecodeString(fileCont) + if err != nil { + return err + } + c := &exp.Config{} + if err := c.ReadBytes([]byte(content)); err != nil { + return err + } + c.File = fileName + if err := c.Save(); err != nil { + return err + } + + default: + return errNoSuchService + } - c.ReadBytes([]byte(fileCont)) - c.File = fileName - c.Save() return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) } @@ -214,8 +265,22 @@ func (a *agent) Config() config.Config { return *a.config } -func (a *agent) Services() map[string]*services.Service { - return a.servs +func (a *agent) Services() []ServiceInfo { + services := []ServiceInfo{} + keys := []string{} + for k := range a.svcs { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + service := ServiceInfo{ + Name: a.svcs[key].Name, + LastSeen: a.svcs[key].LastSeen, + Status: a.svcs[key].Status, + } + services = append(services, service) + } + return services } func (a *agent) Publish(crtlChan, payload string) error { diff --git a/internal/app/agent/services/service.go b/internal/app/agent/services/service.go index 8d665342..c02f9fd4 100644 --- a/internal/app/agent/services/service.go +++ b/internal/app/agent/services/service.go @@ -5,20 +5,18 @@ import ( "time" ) -type Status string - const ( timeout = 3 interval = 10000 - Online Status = "online" - Offline Status = "offline" + online = "online" + offline = "offline" ) type Service struct { Name string LastSeen time.Time - Status Status + Status string counter int done chan bool @@ -29,7 +27,7 @@ type Service struct { func NewService(name string) *Service { ticker := time.NewTicker(interval * time.Millisecond) done := make(chan bool) - s := Service{Name: name, Status: Online, done: done, counter: timeout, ticker: ticker} + s := Service{Name: name, Status: online, done: done, counter: timeout, ticker: ticker} s.Listen() return &s } @@ -44,7 +42,7 @@ func (s *Service) Listen() { s.mu.Lock() s.counter = s.counter - 1 if s.counter == 0 { - s.Status = Offline + s.Status = offline s.counter = timeout } s.mu.Unlock() @@ -58,5 +56,5 @@ func (s *Service) Update() { s.mu.Lock() defer s.mu.Unlock() s.counter = timeout - s.Status = Online + s.Status = online } diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index 4841bb71..123bf5b6 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -14,10 +14,13 @@ import ( "github.com/mainflux/agent/internal/app/agent" "github.com/mainflux/agent/internal/pkg/config" + export "github.com/mainflux/export/pkg/config" log "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/things" ) +const exportConfigFile = "/configs/export/config.toml" + // Config represents the parameters for boostraping type Config struct { URL string @@ -36,11 +39,12 @@ type deviceConfig struct { } type infraConfig struct { - LogLevel string `json:"log_level"` - HTTPPort string `json:"http_port"` - MqttURL string `json:"mqtt_url"` - EdgexURL string `json:"edgex_url"` - NatsURL string `json:"nats_url"` + LogLevel string `json:"log_level"` + HTTPPort string `json:"http_port"` + MqttURL string `json:"mqtt_url"` + EdgexURL string `json:"edgex_url"` + NatsURL string `json:"nats_url"` + ExportConfig export.Config `json:"export_config"` } // Bootstrap - Retrieve device config @@ -80,6 +84,16 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { if err := json.Unmarshal([]byte(dc.Content), &ic); err != nil { return err } + econf := &ic.ExportConfig + if econf != nil { + if econf.File == "" { + econf.File = exportConfigFile + } + logger.Info(fmt.Sprintf("Saving export config file %s", econf.File)) + if err := econf.Save(); err != nil { + logger.Error(fmt.Sprintf("Failed to save export config file %s", err)) + } + } if len(dc.MainfluxChannels) < 2 { return agent.ErrMalformedEntity @@ -97,19 +111,19 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { NatsURL: ic.NatsURL, } - tc := config.ThingConf{ - ID: dc.MainfluxID, - Key: dc.MainfluxKey, - } cc := config.ChanConf{ Control: ctrlChan, Data: dataChan, } ec := config.EdgexConf{URL: ic.EdgexURL} lc := config.LogConf{Level: ic.LogLevel} - mc := config.MQTTConf{URL: ic.MqttURL} + mc := config.MQTTConf{ + URL: ic.MqttURL, + Password: dc.MainfluxKey, + Username: dc.MainfluxID, + } - c := config.New(sc, tc, cc, ec, lc, mc, file) + c := config.New(sc, cc, ec, lc, mc, file) return c.Save() } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 562c73bd..9da78c65 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -4,6 +4,7 @@ package config import ( + "crypto/tls" "fmt" "io/ioutil" @@ -15,11 +16,6 @@ type ServerConf struct { NatsURL string `toml:"nats_url"` } -type ThingConf struct { - ID string `toml:"id"` - Key string `toml:"key"` -} - type ChanConf struct { Control string `toml:"control"` Data string `toml:"data"` @@ -34,13 +30,23 @@ type LogConf struct { } type MQTTConf struct { - URL string `toml:"url"` + URL string `json:"url" toml:"url"` + Username string `json:"username" toml:"username" mapstructure:"username"` + Password string `json:"password" toml:"password" mapstructure:"password"` + MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"` + SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"` + Retain bool `json:"retain" toml:"retain" mapstructure:"retain"` + QoS int `json:"qos" toml:"qos" mapstructure:"qos"` + CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"` + CertPath string `json:"cert_path" toml:"cert_path" mapstructure:"cert_path"` + PrivKeyPath string `json:"priv_key_path" toml:"priv_key_path" mapstructure:"priv_key_path"` + CA []byte `json:"-" toml:"-"` + Cert tls.Certificate `json:"-" toml:"-"` } // Config struct of Mainflux Agent type AgentConf struct { Server ServerConf `toml:"server"` - Thing ThingConf `toml:"thing"` Channels ChanConf `toml:"channels"` Edgex EdgexConf `toml:"edgex"` Log LogConf `toml:"log"` @@ -52,10 +58,9 @@ type Config struct { File string } -func New(sc ServerConf, tc ThingConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, file string) *Config { +func New(sc ServerConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, file string) *Config { ac := AgentConf{ Server: sc, - Thing: tc, Channels: cc, Edgex: ec, Log: lc, diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/conn/conn.go similarity index 77% rename from internal/pkg/mqtt/sub.go rename to internal/pkg/conn/conn.go index 51c4afbb..1e33ea3b 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/conn/conn.go @@ -1,7 +1,7 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 -package mqtt +package conn import ( "fmt" @@ -14,19 +14,18 @@ import ( "github.com/nats-io/go-nats" "robpike.io/filter" - paho "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" ) -type cmdType string - const ( reqTopic = "req" servTopic = "services" commands = "commands" - control cmdType = "control" - exec cmdType = "exec" - config cmdType = "config" + control = "control" + exec = "exec" + config = "config" + service = "service" ) var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`) @@ -41,13 +40,13 @@ type MqttBroker interface { type broker struct { svc agent.Service - client paho.Client + client mqtt.Client logger logger.Logger nats *nats.Conn } // NewBroker returns new MQTT broker instance. -func NewBroker(svc agent.Service, client paho.Client, nats *nats.Conn, log logger.Logger) MqttBroker { +func NewBroker(svc agent.Service, client mqtt.Client, nats *nats.Conn, log logger.Logger) MqttBroker { return &broker{ svc: svc, client: client, @@ -74,7 +73,7 @@ func (b *broker) Subscribe(topic string) error { } // handleNatsMsg triggered when new message is received on MQTT broker -func (b *broker) handleNatsMsg(mc paho.Client, msg paho.Message) { +func (b *broker) handleNatsMsg(mc mqtt.Client, msg mqtt.Message) { if topic := extractNatsTopic(msg.Topic()); topic != "" { b.nats.Publish(topic, msg.Payload()) } @@ -95,14 +94,18 @@ func extractNatsTopic(topic string) string { } // handleMsg triggered when new message is received on MQTT broker -func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { +func (b *broker) handleMsg(mc mqtt.Client, msg mqtt.Message) { sm, err := senml.Decode(msg.Payload(), senml.JSON) if err != nil { b.logger.Warn(fmt.Sprintf("SenML decode failed: %s", err)) return } - cmdType := cmdType(sm.Records[0].Name) + if len(sm.Records) == 0 { + b.logger.Error(fmt.Sprintf("SenML payload empty: `%s`", string(msg.Payload()))) + return + } + cmdType := sm.Records[0].Name cmdStr := *sm.Records[0].StringValue uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":") @@ -118,10 +121,15 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } case config: - b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + b.logger.Info(fmt.Sprintf("Config service for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } + case service: + b.logger.Info(fmt.Sprintf("Services view for uuid %s and command string %s", uuid, cmdStr)) + if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { + b.logger.Warn(fmt.Sprintf("Services view operation failed: %s", err)) + } } }