Skip to content

Commit

Permalink
NOISSUE - Add heartbeat (#8)
Browse files Browse the repository at this point in the history
* add nats and support forwarding messages from mqtt to local nats

Signed-off-by: Mirko Teodorovic <[email protected]>

* agent should work without nats

Signed-off-by: Mirko Teodorovic <[email protected]>

* add heartbeat and app registering to agent

Signed-off-by: Mirko Teodorovic <[email protected]>

* add logic for monitoring service status

Signed-off-by: Mirko Teodorovic <[email protected]>

* add view application status

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove bin

Signed-off-by: Mirko Teodorovic <[email protected]>

* add ticker for counting inactivity

Signed-off-by: Mirko Teodorovic <[email protected]>

* add check for string length and minor fix

Signed-off-by: Mirko Teodorovic <[email protected]>

* add logger and string length check

Signed-off-by: Mirko Teodorovic <[email protected]>

* add logger and string length check

Signed-off-by: Mirko Teodorovic <[email protected]>

* dont use caps for consts

Signed-off-by: Mirko Teodorovic <[email protected]>

* update mod

Signed-off-by: Mirko Teodorovic <[email protected]>

* refactor, move register into agent

Signed-off-by: Mirko Teodorovic <[email protected]>

* refactor, move register into agent

Signed-off-by: Mirko Teodorovic <[email protected]>

* refactor, move register into agent

Signed-off-by: Mirko Teodorovic <[email protected]>

* refactor, move register into agent

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix string index check

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix error handling

Signed-off-by: Mirko Teodorovic <[email protected]>

* change visibility of error

Signed-off-by: Mirko Teodorovic <[email protected]>

* optimize topic parsing

Signed-off-by: Mirko Teodorovic <[email protected]>

* small fix

Signed-off-by: Mirko Teodorovic <[email protected]>

* chane logging messages

Signed-off-by: Mirko Teodorovic <[email protected]>

* inline and string index check  correction

Signed-off-by: Mirko Teodorovic <[email protected]>

* applications -> services

Signed-off-by: Mirko Teodorovic <[email protected]>

* application ->services

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove binary

Signed-off-by: Mirko Teodorovic <[email protected]>

* change method name

Signed-off-by: Mirko Teodorovic <[email protected]>

* change labels

Signed-off-by: Mirko Teodorovic <[email protected]>
  • Loading branch information
mteodor authored and drasko committed Jan 16, 2020
1 parent 3ceaacb commit 2b177f4
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 29 deletions.
27 changes: 21 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mainflux/agent/pkg/edgex"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
nats "github.com/nats-io/go-nats"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -37,6 +38,7 @@ const (
defDataChan = "ea353dac-0298-4fbb-9e5d-501e3699949c"
defEncryption = "false"
defConfigFile = "config.toml"
defNatsURL = nats.DefaultURL

envConfigFile = "MF_AGENT_CONFIG_FILE"
envLogLevel = "MF_AGENT_LOG_LEVEL"
Expand All @@ -53,6 +55,7 @@ const (
envCtrlChan = "MF_AGENT_CONTROL_CHANNEL"
envDataChan = "MF_AGENT_DATA_CHANNEL"
envEncryption = "MF_AGENT_ENCRYPTION"
envNatsURL = "MF_AGENT_NATS_URL"
)

func main() {
Expand All @@ -66,10 +69,19 @@ func main() {
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error()))
}

nc, err := nats.Connect(cfg.Agent.Server.NatsURL)
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL))
}
defer nc.Close()

mqttClient := connectToMQTTBroker(cfg.Agent.MQTT.URL, cfg.Agent.Thing.ID, cfg.Agent.Thing.Key, logger)
edgexClient := edgex.NewClient(cfg.Agent.Edgex.URL, logger)

svc := agent.New(mqttClient, &cfg, edgexClient, logger)
svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
if err != nil {
log.Fatalf(fmt.Sprintf("Error in agent service: %s", err.Error()))
}

svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand All @@ -87,7 +99,7 @@ func main() {
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
go subscribeToMQTTBroker(svc, mqttClient, cfg.Agent.Channels.Control, logger)
go subscribeToMQTTBroker(svc, mqttClient, cfg.Agent.Channels.Control, nc, logger)

errs := make(chan error, 3)

Expand Down Expand Up @@ -122,7 +134,10 @@ func loadConfig(logger logger.Logger) (config.Config, error) {
return config.Config{}, err
}

sc := config.ServerConf{Port: mainflux.Env(envLogLevel, defLogLevel)}
sc := config.ServerConf{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envLogLevel, defLogLevel),
}
tc := config.ThingConf{
ID: mainflux.Env(envThingID, defThingID),
Key: mainflux.Env(envThingKey, defThingKey),
Expand Down Expand Up @@ -170,9 +185,9 @@ func connectToMQTTBroker(mqttURL, thingID, thingKey string, logger logger.Logger
return client
}

func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, logger logger.Logger) {
broker := mqtt.NewBroker(svc, mc, logger)
topic := fmt.Sprintf("channels/%s/messages/req", ctrlChan)
func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) {
broker := mqtt.NewBroker(svc, mc, nc, logger)
topic := fmt.Sprintf("channels/%s/messages", ctrlChan)
if err := broker.Subscribe(topic); err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error()))
os.Exit(1)
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ require (
github.com/edgexfoundry/go-mod-core-contracts v0.1.35
github.com/go-kit/kit v0.9.0
github.com/go-zoo/bone v1.3.0
github.com/gogo/protobuf v1.3.1
github.com/mainflux/mainflux v0.0.0-20191129194728-8c4da8503963
github.com/mainflux/senml v1.0.0
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.6.0
github.com/pelletier/go-toml v1.4.0
github.com/prometheus/client_golang v1.2.1
github.com/stretchr/testify v1.4.0
robpike.io/filter v0.0.0-20150108201509-2984852a2183
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ github.com/gocql/gocql v0.0.0-20181106112037-68ae1e384be4/go.mod h1:4Fw1eo5iaEhD
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
Expand Down Expand Up @@ -133,7 +134,11 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44=
github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ=
github.com/nats-io/go-nats v1.6.0 h1:FznPwMfrVwGnSCh7JTXyJDRW0TIkD4Tr+M1LPJt9T70=
github.com/nats-io/go-nats v1.6.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs=
github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
Expand Down Expand Up @@ -214,6 +219,7 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -280,4 +286,6 @@ gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
robpike.io/filter v0.0.0-20150108201509-2984852a2183 h1:b7Y5VfvTcuK1JCT6YKDIaw5w8j/AYBz6DkX/pStQCsM=
robpike.io/filter v0.0.0-20150108201509-2984852a2183/go.mod h1:JQLSCVDQbISDYGuIQqGa40P6NsyycM3cVAiOKl0tjfI=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
8 changes: 7 additions & 1 deletion internal/app/agent/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint {

func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (interface{}, error) {
c := svc.ViewConfig()
c := svc.Config()

sc := serverConf{port: c.Agent.Server.Port}
cc := chanConf{
Expand All @@ -126,3 +126,9 @@ func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint {
return res, nil
}
}

func viewServicesEndpoint(svc agent.Service) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (interface{}, error) {
return svc.Services(), nil
}
}
18 changes: 14 additions & 4 deletions internal/app/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
log "github.com/mainflux/mainflux/logger"
)
Expand Down Expand Up @@ -65,7 +66,7 @@ func (lm loggingMiddleware) Control(uuid, cmd string) (err error) {

func (lm loggingMiddleware) AddConfig(c config.Config) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method AddConfig took %s to complete", time.Since(begin))
message := fmt.Sprintf("Method add_config took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -76,11 +77,20 @@ func (lm loggingMiddleware) AddConfig(c config.Config) (err error) {
return lm.svc.AddConfig(c)
}

func (lm loggingMiddleware) ViewConfig() config.Config {
func (lm loggingMiddleware) Config() config.Config {
defer func(begin time.Time) {
message := fmt.Sprintf("Method ViewConfig took %s to complete", time.Since(begin))
message := fmt.Sprintf("Method config took %s to complete", time.Since(begin))
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.ViewConfig()
return lm.svc.Config()
}

func (lm loggingMiddleware) Services() map[string]*services.Service {
defer func(begin time.Time) {
message := fmt.Sprintf("Method services took %s to complete", time.Since(begin))
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Services()
}
22 changes: 16 additions & 6 deletions internal/app/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/kit/metrics"
"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
)

Expand Down Expand Up @@ -57,19 +58,28 @@ func (ms *metricsMiddleware) AddConfig(ec config.Config) error {
return ms.svc.AddConfig(ec)
}

func (ms *metricsMiddleware) ViewConfig() config.Config {
func (ms *metricsMiddleware) Config() config.Config {
defer func(begin time.Time) {
ms.counter.With("method", "view_config").Add(1)
ms.latency.With("method", "view_config").Observe(time.Since(begin).Seconds())
ms.counter.With("method", "config").Add(1)
ms.latency.With("method", "config").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.ViewConfig()
return ms.svc.Config()
}

func (ms *metricsMiddleware) Services() map[string]*services.Service {
defer func(begin time.Time) {
ms.counter.With("method", "services").Add(1)
ms.latency.With("method", "services").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Services()
}

func (ms *metricsMiddleware) Publish(topic, payload string) error {
defer func(begin time.Time) {
ms.counter.With("method", "execute").Add(1)
ms.latency.With("method", "execute").Observe(time.Since(begin).Seconds())
ms.counter.With("method", "publish").Add(1)
ms.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Publish(topic, payload)
Expand Down
6 changes: 6 additions & 0 deletions internal/app/agent/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func MakeHandler(svc agent.Service) http.Handler {
encodeResponse,
))

r.Get("/services", kithttp.NewServer(
viewServicesEndpoint(svc),
decodeRequest,
encodeResponse,
))

r.GetFunc("/version", mainflux.Version("agent"))
r.Handle("/metrics", promhttp.Handler())

Expand Down
58 changes: 51 additions & 7 deletions internal/app/agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
package agent

import (
"errors"
"fmt"
"os/exec"
"strings"

paho "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
"github.com/mainflux/agent/pkg/edgex"
"github.com/mainflux/mainflux/errors"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/senml"
"github.com/nats-io/go-nats"
)

const (
Path = "./config.toml"
Path = "./config.toml"
Hearbeat = "heartbeat.*"
)

var (
Expand All @@ -32,6 +35,9 @@ var (

// errUnknownCommand indicates that command is not found
errUnknownCommand = errors.New("Unknown command")

// errNatsSubscribing indicates problem with sub to topic for heartbeat
errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic")
)

// Service specifies API for publishing messages and subscribing to topics.
Expand All @@ -45,8 +51,11 @@ type Service interface {
// Update configuration file
AddConfig(config.Config) error

// View returns Config struct created from config file
ViewConfig() config.Config
// Config returns Config struct created from config file
Config() config.Config

// Services returns service list
Services() map[string]*services.Service

// Publish message
Publish(string, string) error
Expand All @@ -59,16 +68,47 @@ type agent struct {
config *config.Config
edgexClient edgex.Client
logger log.Logger
nats *nats.Conn
servs map[string]*services.Service
}

// New returns agent service implementation.
func New(mc paho.Client, cfg *config.Config, ec edgex.Client, logger log.Logger) Service {
return &agent{
func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, logger log.Logger) (Service, errors.Error) {
ag := &agent{
mqttClient: mc,
edgexClient: ec,
config: cfg,
nats: nc,
logger: logger,
servs: make(map[string]*services.Service),
}

_, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) {
sub := msg.Subject
tok := strings.Split(sub, ".")
if len(tok) < 2 {
ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub))
return
}
servname := tok[1]
// Service name is extracted from the subtopic
// if there is multiple instances of the same service
// we will have to add another distinction
if _, ok := ag.servs[servname]; !ok {
serv := services.NewService(servname)
ag.servs[servname] = serv
ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname))
}
serv := ag.servs[servname]
serv.Update()
})

if err != nil {
return ag, errors.Wrap(errNatsSubscribing, err)
}

return ag, nil

}

func (a *agent) Execute(uuid, cmd string) (string, error) {
Expand Down Expand Up @@ -137,10 +177,14 @@ func (a *agent) AddConfig(c config.Config) error {
return c.Save()
}

func (a *agent) ViewConfig() config.Config {
func (a *agent) Config() config.Config {
return *a.config
}

func (a *agent) Services() map[string]*services.Service {
return a.servs
}

func (a *agent) Publish(crtlChan, payload string) error {
topic := fmt.Sprintf("channels/%s/messages/res", crtlChan)
token := a.mqttClient.Publish(topic, 0, false, payload)
Expand Down
Loading

0 comments on commit 2b177f4

Please sign in to comment.