Skip to content

Commit

Permalink
NOISSUE - add view services via mqtt (#19)
Browse files Browse the repository at this point in the history
* add view services via mqtt

Signed-off-by: Mirko Teodorovic <[email protected]>

* small fix

Signed-off-by: Mirko Teodorovic <[email protected]>

* minor changes

Signed-off-by: Mirko Teodorovic <[email protected]>

* update comment

Signed-off-by: Mirko Teodorovic <[email protected]>

* add different endpoint for services view

Signed-off-by: Mirko Teodorovic <[email protected]>

* dont use pointers

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix comments

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix errror return

Signed-off-by: Mirko Teodorovic <[email protected]>

* inline function

Signed-off-by: Mirko Teodorovic <[email protected]>

* small changes

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove cmdType type

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove pointers

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove parenthesis

Signed-off-by: Mirko Teodorovic <[email protected]>

* add service info type

Signed-off-by: Mirko Teodorovic <[email protected]>
  • Loading branch information
mteodor authored Feb 4, 2020
1 parent 75ac451 commit 22484cb
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 45 deletions.
3 changes: 1 addition & 2 deletions internal/app/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
log "github.com/mainflux/mainflux/logger"
)
Expand Down Expand Up @@ -99,7 +98,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) {
return lm.svc.ServiceConfig(uuid, cmdStr)
}

func (lm loggingMiddleware) Services() map[string]*services.Service {
func (lm loggingMiddleware) Services() []agent.ServiceInfo {
defer func(begin time.Time) {
message := fmt.Sprintf("Method services took %s to complete", time.Since(begin))
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
Expand Down
3 changes: 1 addition & 2 deletions internal/app/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/go-kit/kit/metrics"
"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
)

Expand Down Expand Up @@ -76,7 +75,7 @@ func (ms *metricsMiddleware) Config() config.Config {
return ms.svc.Config()
}

func (ms *metricsMiddleware) Services() map[string]*services.Service {
func (ms *metricsMiddleware) Services() []agent.ServiceInfo {
defer func(begin time.Time) {
ms.counter.With("method", "services").Add(1)
ms.latency.With("method", "services").Observe(time.Since(begin).Seconds())
Expand Down
125 changes: 95 additions & 30 deletions internal/app/agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package agent

import (
"encoding/base64"
"encoding/json"
"fmt"
"os/exec"
"sort"
"strings"
"time"

paho "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/agent/internal/app/agent/services"
Expand All @@ -25,6 +28,9 @@ const (
Hearbeat = "heartbeat.*"
Commands = "commands"
Config = "config"

view = "view"
save = "save"
)

var (
Expand All @@ -42,6 +48,9 @@ var (

// errNatsSubscribing indicates problem with sub to topic for heartbeat
errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic")

// errNoSuchService indicates service not supported
errNoSuchService = errors.New("no such service")
)

// Service specifies API for publishing messages and subscribing to topics.
Expand All @@ -62,21 +71,27 @@ type Service interface {
ServiceConfig(uuid, cmdStr string) error

// Services returns service list
Services() map[string]*services.Service
Services() []ServiceInfo

// Publish message
Publish(string, string) error
}

var _ Service = (*agent)(nil)

type ServiceInfo struct {
Name string
LastSeen time.Time
Status string
}

type agent struct {
mqttClient paho.Client
config *config.Config
edgexClient edgex.Client
logger log.Logger
nats *nats.Conn
servs map[string]*services.Service
svcs map[string]*services.Service
}

// New returns agent service implementation.
Expand All @@ -87,7 +102,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log
config: cfg,
nats: nc,
logger: logger,
servs: make(map[string]*services.Service),
svcs: make(map[string]*services.Service),
}

_, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) {
Expand All @@ -97,16 +112,16 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log
ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub))
return
}
servname := tok[1]
svcname := tok[1]
// Service name is extracted from the subtopic
// if there is multiple instances of the same service
// we will have to add another distinction
if _, ok := ag.servs[servname]; !ok {
serv := services.NewService(servname)
ag.servs[servname] = serv
ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname))
if _, ok := ag.svcs[svcname]; !ok {
svc := services.NewService(svcname)
ag.svcs[svcname] = svc
ag.logger.Info(fmt.Sprintf("Services '%s' registered", svcname))
}
serv := ag.servs[servname]
serv := ag.svcs[svcname]
serv.Update()
})

Expand Down Expand Up @@ -168,41 +183,77 @@ func (a *agent) Control(uuid, cmdStr string) error {
return err
}

payload, err := encodeSenML(uuid, cmd, resp)
if err != nil {
return err
}

if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil {
return err
}

return nil
return a.processResponse(uuid, cmd, resp)
}

// Message for this command
// "[{"bn":"1:", "n":"config", "vs":"export, /configs/export/config.toml, config_file_content"}]"
// [{"bn":"1:", "n":"services", "vs":"view"}]
// [{"bn":"1:", "n":"config", "vs":"save, export, filename, filecontent"}]
// config_file_content is base64 encoded marshaled structure representing service conf
// Example of creation:
// b, _ := toml.Marshal(cfg)
// config_file_content := base64.StdEncoding.EncodeToString(b)
func (a *agent) ServiceConfig(uuid, cmdStr string) error {
cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",")
if len(cmdArgs) < 3 {
if len(cmdArgs) < 1 {
return errInvalidCommand
}
resp := ""
cmd := cmdArgs[0]

service := cmdArgs[0]
fileName := cmdArgs[1]
fileCont, err := base64.StdEncoding.DecodeString(cmdArgs[2])
switch cmd {
case view:
services, err := json.Marshal(a.Services())
if err != nil {
return err
}
resp = string(services)
case save:
if len(cmdArgs) < 4 {
return errInvalidCommand
}
service := cmdArgs[1]
fileName := cmdArgs[2]
fileCont := cmdArgs[3]
if err := a.saveConfig(service, fileName, fileCont); err != nil {
return err
}
}
return a.processResponse(uuid, cmd, resp)
}

func (a *agent) processResponse(uuid, cmd, resp string) error {
payload, err := encodeSenML(uuid, cmd, resp)
if err != nil {
return err
}
c := &export.Config{}
if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil {
return err
}
return nil
}

func (a *agent) saveConfig(service, fileName, fileCont string) error {
switch service {
case "export":
content, err := base64.StdEncoding.DecodeString(fileCont)
if err != nil {
return err
}

c := &export.Config{}
if err := c.ReadBytes([]byte(content)); err != nil {
return err
}
c.File = fileName
if err := c.Save(); err != nil {
return err
}

default:
return errNoSuchService
}

c.ReadBytes([]byte(fileCont))
c.File = fileName
c.Save()
return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte(""))
}

Expand All @@ -214,8 +265,22 @@ func (a *agent) Config() config.Config {
return *a.config
}

func (a *agent) Services() map[string]*services.Service {
return a.servs
func (a *agent) Services() []ServiceInfo {
services := []ServiceInfo{}
keys := []string{}
for k := range a.svcs {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
service := ServiceInfo{
Name: a.svcs[key].Name,
LastSeen: a.svcs[key].LastSeen,
Status: a.svcs[key].Status,
}
services = append(services, service)
}
return services
}

func (a *agent) Publish(crtlChan, payload string) error {
Expand Down
8 changes: 3 additions & 5 deletions internal/app/agent/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ import (
"time"
)

type Status string

const (
timeout = 3
interval = 10000

Online Status = "online"
Offline Status = "offline"
Online = "online"
Offline = "offline"
)

type Service struct {
Name string
LastSeen time.Time
Status Status
Status string

counter int
done chan bool
Expand Down
16 changes: 10 additions & 6 deletions internal/pkg/mqtt/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ import (
paho "github.com/eclipse/paho.mqtt.golang"
)

type cmdType string

const (
reqTopic = "req"
servTopic = "services"
commands = "commands"

control cmdType = "control"
exec cmdType = "exec"
config cmdType = "config"
control = "control"
exec = "exec"
config = "config"
service = "service"
)

var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`)
Expand Down Expand Up @@ -102,7 +101,7 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) {
return
}

cmdType := cmdType(sm.Records[0].Name)
cmdType := sm.Records[0].Name
cmdStr := *sm.Records[0].StringValue
uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":")

Expand All @@ -122,6 +121,11 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) {
if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil {
b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err))
}
case service:
b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr))
if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil {
b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err))
}
}

}

0 comments on commit 22484cb

Please sign in to comment.