diff --git a/internal/app/agent/api/logging.go b/internal/app/agent/api/logging.go index e552aab5..60547798 100644 --- a/internal/app/agent/api/logging.go +++ b/internal/app/agent/api/logging.go @@ -8,7 +8,6 @@ import ( "time" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" log "github.com/mainflux/mainflux/logger" ) @@ -99,7 +98,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) { return lm.svc.ServiceConfig(uuid, cmdStr) } -func (lm loggingMiddleware) Services() map[string]*services.Service { +func (lm loggingMiddleware) Services() []agent.ServiceInfo { defer func(begin time.Time) { message := fmt.Sprintf("Method services took %s to complete", time.Since(begin)) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) diff --git a/internal/app/agent/api/metrics.go b/internal/app/agent/api/metrics.go index f6414305..9a361feb 100644 --- a/internal/app/agent/api/metrics.go +++ b/internal/app/agent/api/metrics.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/metrics" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" ) @@ -76,7 +75,7 @@ func (ms *metricsMiddleware) Config() config.Config { return ms.svc.Config() } -func (ms *metricsMiddleware) Services() map[string]*services.Service { +func (ms *metricsMiddleware) Services() []agent.ServiceInfo { defer func(begin time.Time) { ms.counter.With("method", "services").Add(1) ms.latency.With("method", "services").Observe(time.Since(begin).Seconds()) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 3fad9dc8..50b6c762 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -5,9 +5,12 @@ package agent import ( "encoding/base64" + "encoding/json" "fmt" "os/exec" + "sort" "strings" + "time" paho "github.com/eclipse/paho.mqtt.golang" "github.com/mainflux/agent/internal/app/agent/services" @@ -25,6 +28,9 @@ const ( Hearbeat = "heartbeat.*" Commands = "commands" Config = "config" + + view = "view" + save = "save" ) var ( @@ -42,6 +48,9 @@ var ( // errNatsSubscribing indicates problem with sub to topic for heartbeat errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic") + + // errNoSuchService indicates service not supported + errNoSuchService = errors.New("no such service") ) // Service specifies API for publishing messages and subscribing to topics. @@ -62,7 +71,7 @@ type Service interface { ServiceConfig(uuid, cmdStr string) error // Services returns service list - Services() map[string]*services.Service + Services() []ServiceInfo // Publish message Publish(string, string) error @@ -70,13 +79,19 @@ type Service interface { var _ Service = (*agent)(nil) +type ServiceInfo struct { + Name string + LastSeen time.Time + Status string +} + type agent struct { mqttClient paho.Client config *config.Config edgexClient edgex.Client logger log.Logger nats *nats.Conn - servs map[string]*services.Service + svcs map[string]*services.Service } // New returns agent service implementation. @@ -87,7 +102,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log config: cfg, nats: nc, logger: logger, - servs: make(map[string]*services.Service), + svcs: make(map[string]*services.Service), } _, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) { @@ -97,16 +112,16 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub)) return } - servname := tok[1] + svcname := tok[1] // Service name is extracted from the subtopic // if there is multiple instances of the same service // we will have to add another distinction - if _, ok := ag.servs[servname]; !ok { - serv := services.NewService(servname) - ag.servs[servname] = serv - ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname)) + if _, ok := ag.svcs[svcname]; !ok { + svc := services.NewService(svcname) + ag.svcs[svcname] = svc + ag.logger.Info(fmt.Sprintf("Services '%s' registered", svcname)) } - serv := ag.servs[servname] + serv := ag.svcs[svcname] serv.Update() }) @@ -168,41 +183,77 @@ func (a *agent) Control(uuid, cmdStr string) error { return err } - payload, err := encodeSenML(uuid, cmd, resp) - if err != nil { - return err - } - - if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { - return err - } - - return nil + return a.processResponse(uuid, cmd, resp) } // Message for this command -// "[{"bn":"1:", "n":"config", "vs":"export, /configs/export/config.toml, config_file_content"}]" +// [{"bn":"1:", "n":"services", "vs":"view"}] +// [{"bn":"1:", "n":"config", "vs":"save, export, filename, filecontent"}] // config_file_content is base64 encoded marshaled structure representing service conf // Example of creation: // b, _ := toml.Marshal(cfg) // config_file_content := base64.StdEncoding.EncodeToString(b) func (a *agent) ServiceConfig(uuid, cmdStr string) error { cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",") - if len(cmdArgs) < 3 { + if len(cmdArgs) < 1 { return errInvalidCommand } + resp := "" + cmd := cmdArgs[0] - service := cmdArgs[0] - fileName := cmdArgs[1] - fileCont, err := base64.StdEncoding.DecodeString(cmdArgs[2]) + switch cmd { + case view: + services, err := json.Marshal(a.Services()) + if err != nil { + return err + } + resp = string(services) + case save: + if len(cmdArgs) < 4 { + return errInvalidCommand + } + service := cmdArgs[1] + fileName := cmdArgs[2] + fileCont := cmdArgs[3] + if err := a.saveConfig(service, fileName, fileCont); err != nil { + return err + } + } + return a.processResponse(uuid, cmd, resp) +} + +func (a *agent) processResponse(uuid, cmd, resp string) error { + payload, err := encodeSenML(uuid, cmd, resp) if err != nil { return err } - c := &export.Config{} + if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { + return err + } + return nil +} + +func (a *agent) saveConfig(service, fileName, fileCont string) error { + switch service { + case "export": + content, err := base64.StdEncoding.DecodeString(fileCont) + if err != nil { + return err + } + + c := &export.Config{} + if err := c.ReadBytes([]byte(content)); err != nil { + return err + } + c.File = fileName + if err := c.Save(); err != nil { + return err + } + + default: + return errNoSuchService + } - c.ReadBytes([]byte(fileCont)) - c.File = fileName - c.Save() return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) } @@ -214,8 +265,22 @@ func (a *agent) Config() config.Config { return *a.config } -func (a *agent) Services() map[string]*services.Service { - return a.servs +func (a *agent) Services() []ServiceInfo { + services := []ServiceInfo{} + keys := []string{} + for k := range a.svcs { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + service := ServiceInfo{ + Name: a.svcs[key].Name, + LastSeen: a.svcs[key].LastSeen, + Status: a.svcs[key].Status, + } + services = append(services, service) + } + return services } func (a *agent) Publish(crtlChan, payload string) error { diff --git a/internal/app/agent/services/service.go b/internal/app/agent/services/service.go index 8d665342..5ad6a786 100644 --- a/internal/app/agent/services/service.go +++ b/internal/app/agent/services/service.go @@ -5,20 +5,18 @@ import ( "time" ) -type Status string - const ( timeout = 3 interval = 10000 - Online Status = "online" - Offline Status = "offline" + Online = "online" + Offline = "offline" ) type Service struct { Name string LastSeen time.Time - Status Status + Status string counter int done chan bool diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 51c4afbb..819def3d 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -17,16 +17,15 @@ import ( paho "github.com/eclipse/paho.mqtt.golang" ) -type cmdType string - const ( reqTopic = "req" servTopic = "services" commands = "commands" - control cmdType = "control" - exec cmdType = "exec" - config cmdType = "config" + control = "control" + exec = "exec" + config = "config" + service = "service" ) var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`) @@ -102,7 +101,7 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { return } - cmdType := cmdType(sm.Records[0].Name) + cmdType := sm.Records[0].Name cmdStr := *sm.Records[0].StringValue uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":") @@ -122,6 +121,11 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } + case service: + b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { + b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) + } } }