From 2b177f4c6e116aa482449935b004dc9d12552a82 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 16 Jan 2020 13:20:03 +0100 Subject: [PATCH] NOISSUE - Add heartbeat (#8) * add nats and support forwarding messages from mqtt to local nats Signed-off-by: Mirko Teodorovic * agent should work without nats Signed-off-by: Mirko Teodorovic * add heartbeat and app registering to agent Signed-off-by: Mirko Teodorovic * add logic for monitoring service status Signed-off-by: Mirko Teodorovic * add view application status Signed-off-by: Mirko Teodorovic * remove bin Signed-off-by: Mirko Teodorovic * add ticker for counting inactivity Signed-off-by: Mirko Teodorovic * add check for string length and minor fix Signed-off-by: Mirko Teodorovic * add logger and string length check Signed-off-by: Mirko Teodorovic * add logger and string length check Signed-off-by: Mirko Teodorovic * dont use caps for consts Signed-off-by: Mirko Teodorovic * update mod Signed-off-by: Mirko Teodorovic * refactor, move register into agent Signed-off-by: Mirko Teodorovic * refactor, move register into agent Signed-off-by: Mirko Teodorovic * refactor, move register into agent Signed-off-by: Mirko Teodorovic * refactor, move register into agent Signed-off-by: Mirko Teodorovic * fix string index check Signed-off-by: Mirko Teodorovic * fix error handling Signed-off-by: Mirko Teodorovic * change visibility of error Signed-off-by: Mirko Teodorovic * optimize topic parsing Signed-off-by: Mirko Teodorovic * small fix Signed-off-by: Mirko Teodorovic * chane logging messages Signed-off-by: Mirko Teodorovic * inline and string index check correction Signed-off-by: Mirko Teodorovic * applications -> services Signed-off-by: Mirko Teodorovic * application ->services Signed-off-by: Mirko Teodorovic * remove binary Signed-off-by: Mirko Teodorovic * change method name Signed-off-by: Mirko Teodorovic * change labels Signed-off-by: Mirko Teodorovic --- cmd/main.go | 27 ++++++++--- go.mod | 4 ++ go.sum | 8 ++++ internal/app/agent/api/endpoints.go | 8 +++- internal/app/agent/api/logging.go | 18 ++++++-- internal/app/agent/api/metrics.go | 22 ++++++--- internal/app/agent/api/transport.go | 6 +++ internal/app/agent/service.go | 58 +++++++++++++++++++++--- internal/app/agent/services/service.go | 62 ++++++++++++++++++++++++++ internal/pkg/mqtt/sub.go | 57 ++++++++++++++++++++--- 10 files changed, 241 insertions(+), 29 deletions(-) create mode 100644 internal/app/agent/services/service.go diff --git a/cmd/main.go b/cmd/main.go index 009575d8..8ed0654f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/mainflux/agent/pkg/edgex" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + nats "github.com/nats-io/go-nats" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -37,6 +38,7 @@ const ( defDataChan = "ea353dac-0298-4fbb-9e5d-501e3699949c" defEncryption = "false" defConfigFile = "config.toml" + defNatsURL = nats.DefaultURL envConfigFile = "MF_AGENT_CONFIG_FILE" envLogLevel = "MF_AGENT_LOG_LEVEL" @@ -53,6 +55,7 @@ const ( envCtrlChan = "MF_AGENT_CONTROL_CHANNEL" envDataChan = "MF_AGENT_DATA_CHANNEL" envEncryption = "MF_AGENT_ENCRYPTION" + envNatsURL = "MF_AGENT_NATS_URL" ) func main() { @@ -66,10 +69,19 @@ func main() { log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error())) } + nc, err := nats.Connect(cfg.Agent.Server.NatsURL) + if err != nil { + log.Fatalf(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL)) + } + defer nc.Close() + mqttClient := connectToMQTTBroker(cfg.Agent.MQTT.URL, cfg.Agent.Thing.ID, cfg.Agent.Thing.Key, logger) edgexClient := edgex.NewClient(cfg.Agent.Edgex.URL, logger) - svc := agent.New(mqttClient, &cfg, edgexClient, logger) + svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger) + if err != nil { + log.Fatalf(fmt.Sprintf("Error in agent service: %s", err.Error())) + } svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( @@ -87,7 +99,7 @@ func main() { Help: "Total duration of requests in microseconds.", }, []string{"method"}), ) - go subscribeToMQTTBroker(svc, mqttClient, cfg.Agent.Channels.Control, logger) + go subscribeToMQTTBroker(svc, mqttClient, cfg.Agent.Channels.Control, nc, logger) errs := make(chan error, 3) @@ -122,7 +134,10 @@ func loadConfig(logger logger.Logger) (config.Config, error) { return config.Config{}, err } - sc := config.ServerConf{Port: mainflux.Env(envLogLevel, defLogLevel)} + sc := config.ServerConf{ + NatsURL: mainflux.Env(envNatsURL, defNatsURL), + Port: mainflux.Env(envLogLevel, defLogLevel), + } tc := config.ThingConf{ ID: mainflux.Env(envThingID, defThingID), Key: mainflux.Env(envThingKey, defThingKey), @@ -170,9 +185,9 @@ func connectToMQTTBroker(mqttURL, thingID, thingKey string, logger logger.Logger return client } -func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, logger logger.Logger) { - broker := mqtt.NewBroker(svc, mc, logger) - topic := fmt.Sprintf("channels/%s/messages/req", ctrlChan) +func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) { + broker := mqtt.NewBroker(svc, mc, nc, logger) + topic := fmt.Sprintf("channels/%s/messages", ctrlChan) if err := broker.Subscribe(topic); err != nil { logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error())) os.Exit(1) diff --git a/go.mod b/go.mod index 726c4ec0..1503942e 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,13 @@ require ( github.com/edgexfoundry/go-mod-core-contracts v0.1.35 github.com/go-kit/kit v0.9.0 github.com/go-zoo/bone v1.3.0 + github.com/gogo/protobuf v1.3.1 github.com/mainflux/mainflux v0.0.0-20191129194728-8c4da8503963 github.com/mainflux/senml v1.0.0 + github.com/nats-io/gnatsd v1.4.1 // indirect + github.com/nats-io/go-nats v1.6.0 github.com/pelletier/go-toml v1.4.0 github.com/prometheus/client_golang v1.2.1 github.com/stretchr/testify v1.4.0 + robpike.io/filter v0.0.0-20150108201509-2984852a2183 ) diff --git a/go.sum b/go.sum index 940e2f06..c5f900f9 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,7 @@ github.com/gocql/gocql v0.0.0-20181106112037-68ae1e384be4/go.mod h1:4Fw1eo5iaEhD github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= @@ -133,7 +134,11 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44= +github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= +github.com/nats-io/go-nats v1.6.0 h1:FznPwMfrVwGnSCh7JTXyJDRW0TIkD4Tr+M1LPJt9T70= github.com/nats-io/go-nats v1.6.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= +github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs= github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -214,6 +219,7 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -280,4 +286,6 @@ gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +robpike.io/filter v0.0.0-20150108201509-2984852a2183 h1:b7Y5VfvTcuK1JCT6YKDIaw5w8j/AYBz6DkX/pStQCsM= +robpike.io/filter v0.0.0-20150108201509-2984852a2183/go.mod h1:JQLSCVDQbISDYGuIQqGa40P6NsyycM3cVAiOKl0tjfI= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/internal/app/agent/api/endpoints.go b/internal/app/agent/api/endpoints.go index 4d1073ae..95bdc936 100644 --- a/internal/app/agent/api/endpoints.go +++ b/internal/app/agent/api/endpoints.go @@ -99,7 +99,7 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint { func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint { return func(_ context.Context, request interface{}) (interface{}, error) { - c := svc.ViewConfig() + c := svc.Config() sc := serverConf{port: c.Agent.Server.Port} cc := chanConf{ @@ -126,3 +126,9 @@ func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint { return res, nil } } + +func viewServicesEndpoint(svc agent.Service) endpoint.Endpoint { + return func(_ context.Context, request interface{}) (interface{}, error) { + return svc.Services(), nil + } +} diff --git a/internal/app/agent/api/logging.go b/internal/app/agent/api/logging.go index bf3bca46..0e0f1457 100644 --- a/internal/app/agent/api/logging.go +++ b/internal/app/agent/api/logging.go @@ -8,6 +8,7 @@ import ( "time" "github.com/mainflux/agent/internal/app/agent" + "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" log "github.com/mainflux/mainflux/logger" ) @@ -65,7 +66,7 @@ func (lm loggingMiddleware) Control(uuid, cmd string) (err error) { func (lm loggingMiddleware) AddConfig(c config.Config) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method AddConfig took %s to complete", time.Since(begin)) + message := fmt.Sprintf("Method add_config took %s to complete", time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -76,11 +77,20 @@ func (lm loggingMiddleware) AddConfig(c config.Config) (err error) { return lm.svc.AddConfig(c) } -func (lm loggingMiddleware) ViewConfig() config.Config { +func (lm loggingMiddleware) Config() config.Config { defer func(begin time.Time) { - message := fmt.Sprintf("Method ViewConfig took %s to complete", time.Since(begin)) + message := fmt.Sprintf("Method config took %s to complete", time.Since(begin)) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.ViewConfig() + return lm.svc.Config() +} + +func (lm loggingMiddleware) Services() map[string]*services.Service { + defer func(begin time.Time) { + message := fmt.Sprintf("Method services took %s to complete", time.Since(begin)) + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.Services() } diff --git a/internal/app/agent/api/metrics.go b/internal/app/agent/api/metrics.go index 20714cda..7d3d6e61 100644 --- a/internal/app/agent/api/metrics.go +++ b/internal/app/agent/api/metrics.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/metrics" "github.com/mainflux/agent/internal/app/agent" + "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" ) @@ -57,19 +58,28 @@ func (ms *metricsMiddleware) AddConfig(ec config.Config) error { return ms.svc.AddConfig(ec) } -func (ms *metricsMiddleware) ViewConfig() config.Config { +func (ms *metricsMiddleware) Config() config.Config { defer func(begin time.Time) { - ms.counter.With("method", "view_config").Add(1) - ms.latency.With("method", "view_config").Observe(time.Since(begin).Seconds()) + ms.counter.With("method", "config").Add(1) + ms.latency.With("method", "config").Observe(time.Since(begin).Seconds()) }(time.Now()) - return ms.svc.ViewConfig() + return ms.svc.Config() +} + +func (ms *metricsMiddleware) Services() map[string]*services.Service { + defer func(begin time.Time) { + ms.counter.With("method", "services").Add(1) + ms.latency.With("method", "services").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.Services() } func (ms *metricsMiddleware) Publish(topic, payload string) error { defer func(begin time.Time) { - ms.counter.With("method", "execute").Add(1) - ms.latency.With("method", "execute").Observe(time.Since(begin).Seconds()) + ms.counter.With("method", "publish").Add(1) + ms.latency.With("method", "publish").Observe(time.Since(begin).Seconds()) }(time.Now()) return ms.svc.Publish(topic, payload) diff --git a/internal/app/agent/api/transport.go b/internal/app/agent/api/transport.go index 71f16d79..7c19fa97 100644 --- a/internal/app/agent/api/transport.go +++ b/internal/app/agent/api/transport.go @@ -45,6 +45,12 @@ func MakeHandler(svc agent.Service) http.Handler { encodeResponse, )) + r.Get("/services", kithttp.NewServer( + viewServicesEndpoint(svc), + decodeRequest, + encodeResponse, + )) + r.GetFunc("/version", mainflux.Version("agent")) r.Handle("/metrics", promhttp.Handler()) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 69e0140a..3622391f 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -4,20 +4,23 @@ package agent import ( - "errors" "fmt" "os/exec" "strings" paho "github.com/eclipse/paho.mqtt.golang" + "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" "github.com/mainflux/agent/pkg/edgex" + "github.com/mainflux/mainflux/errors" log "github.com/mainflux/mainflux/logger" "github.com/mainflux/senml" + "github.com/nats-io/go-nats" ) const ( - Path = "./config.toml" + Path = "./config.toml" + Hearbeat = "heartbeat.*" ) var ( @@ -32,6 +35,9 @@ var ( // errUnknownCommand indicates that command is not found errUnknownCommand = errors.New("Unknown command") + + // errNatsSubscribing indicates problem with sub to topic for heartbeat + errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic") ) // Service specifies API for publishing messages and subscribing to topics. @@ -45,8 +51,11 @@ type Service interface { // Update configuration file AddConfig(config.Config) error - // View returns Config struct created from config file - ViewConfig() config.Config + // Config returns Config struct created from config file + Config() config.Config + + // Services returns service list + Services() map[string]*services.Service // Publish message Publish(string, string) error @@ -59,16 +68,47 @@ type agent struct { config *config.Config edgexClient edgex.Client logger log.Logger + nats *nats.Conn + servs map[string]*services.Service } // New returns agent service implementation. -func New(mc paho.Client, cfg *config.Config, ec edgex.Client, logger log.Logger) Service { - return &agent{ +func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, logger log.Logger) (Service, errors.Error) { + ag := &agent{ mqttClient: mc, edgexClient: ec, config: cfg, + nats: nc, logger: logger, + servs: make(map[string]*services.Service), + } + + _, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) { + sub := msg.Subject + tok := strings.Split(sub, ".") + if len(tok) < 2 { + ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub)) + return + } + servname := tok[1] + // Service name is extracted from the subtopic + // if there is multiple instances of the same service + // we will have to add another distinction + if _, ok := ag.servs[servname]; !ok { + serv := services.NewService(servname) + ag.servs[servname] = serv + ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname)) + } + serv := ag.servs[servname] + serv.Update() + }) + + if err != nil { + return ag, errors.Wrap(errNatsSubscribing, err) } + + return ag, nil + } func (a *agent) Execute(uuid, cmd string) (string, error) { @@ -137,10 +177,14 @@ func (a *agent) AddConfig(c config.Config) error { return c.Save() } -func (a *agent) ViewConfig() config.Config { +func (a *agent) Config() config.Config { return *a.config } +func (a *agent) Services() map[string]*services.Service { + return a.servs +} + func (a *agent) Publish(crtlChan, payload string) error { topic := fmt.Sprintf("channels/%s/messages/res", crtlChan) token := a.mqttClient.Publish(topic, 0, false, payload) diff --git a/internal/app/agent/services/service.go b/internal/app/agent/services/service.go new file mode 100644 index 00000000..8d665342 --- /dev/null +++ b/internal/app/agent/services/service.go @@ -0,0 +1,62 @@ +package services + +import ( + "sync" + "time" +) + +type Status string + +const ( + timeout = 3 + interval = 10000 + + Online Status = "online" + Offline Status = "offline" +) + +type Service struct { + Name string + LastSeen time.Time + Status Status + + counter int + done chan bool + ticker *time.Ticker + mu sync.Mutex +} + +func NewService(name string) *Service { + ticker := time.NewTicker(interval * time.Millisecond) + done := make(chan bool) + s := Service{Name: name, Status: Online, done: done, counter: timeout, ticker: ticker} + s.Listen() + return &s +} + +func (s *Service) Listen() { + go func() { + for { + select { + case <-s.ticker.C: + // TODO - we can disable ticker when the status gets OFFLINE + // and on the next heartbeat enable it again + s.mu.Lock() + s.counter = s.counter - 1 + if s.counter == 0 { + s.Status = Offline + s.counter = timeout + } + s.mu.Unlock() + } + } + }() +} + +func (s *Service) Update() { + s.LastSeen = time.Now() + s.mu.Lock() + defer s.mu.Unlock() + s.counter = timeout + s.Status = Online +} diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 22d0bf7b..affca44d 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -5,15 +5,32 @@ package mqtt import ( "fmt" + "regexp" "strings" "github.com/mainflux/agent/internal/app/agent" "github.com/mainflux/mainflux/logger" "github.com/mainflux/senml" + "github.com/nats-io/go-nats" + "robpike.io/filter" paho "github.com/eclipse/paho.mqtt.golang" ) +type cmdType string + +const ( + reqTopic = "req" + servTopic = "services" + commands = "commands" + + control cmdType = "control" + exec cmdType = "exec" + config cmdType = "config" +) + +var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`) + var _ MqttBroker = (*broker)(nil) // MqttBroker represents the MQTT broker. @@ -26,27 +43,57 @@ type broker struct { svc agent.Service client paho.Client logger logger.Logger + nats *nats.Conn } // NewBroker returns new MQTT broker instance. -func NewBroker(svc agent.Service, client paho.Client, log logger.Logger) MqttBroker { +func NewBroker(svc agent.Service, client paho.Client, nats *nats.Conn, log logger.Logger) MqttBroker { return &broker{ svc: svc, client: client, logger: log, + nats: nats, } } // Subscribe subscribes to the MQTT message broker func (b *broker) Subscribe(topic string) error { - s := b.client.Subscribe(topic, 0, b.handleMsg) + s := b.client.Subscribe(fmt.Sprintf("%s/%s", topic, reqTopic), 0, b.handleMsg) if err := s.Error(); s.Wait() && err != nil { return err } + if b.nats != nil { + n := b.client.Subscribe(fmt.Sprintf("%s/%s/#", topic, servTopic), 0, b.handleNatsMsg) + if err := n.Error(); n.Wait() && err != nil { + return err + } + } + return nil } +// handleNatsMsg triggered when new message is received on MQTT broker +func (b *broker) handleNatsMsg(mc paho.Client, msg paho.Message) { + if topic := extractNatsTopic(msg.Topic()); topic != "" { + b.nats.Publish(topic, msg.Payload()) + } +} + +func extractNatsTopic(topic string) string { + isEmpty := func(s string) bool { + return (len(s) == 0) + } + channelParts := channelPartRegExp.FindStringSubmatch(topic) + if len(channelParts) < 3 { + return "" + } + filtered := filter.Drop(strings.Split(channelParts[2], "/"), isEmpty).([]string) + natsTopic := strings.Join(filtered, ".") + + return fmt.Sprintf("%s.%s", commands, natsTopic) +} + // handleMsg triggered when new message is received on MQTT broker func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { sm, err := senml.Decode(msg.Payload(), senml.JSON) @@ -55,17 +102,17 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { return } - cmdType := sm.Records[0].Name + cmdType := cmdType(sm.Records[0].Name) cmdStr := *sm.Records[0].StringValue uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":") switch cmdType { - case "control": + case control: b.logger.Info(fmt.Sprintf("Control command for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.Control(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Control operation failed: %s", err)) } - case "exec": + case exec: b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) if _, err := b.svc.Execute(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err))