Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AG-22 - Add Agent Tests #58

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Build and Publish Docker Image
on:
push:
branches:
- tests # Change this to your main branch name if different
SammyOina marked this conversation as resolved.
Show resolved Hide resolved

jobs:
build-and-publish:
runs-on: ubuntu-latest

steps:
- name: Checkout Repository
uses: actions/checkout@v2
SammyOina marked this conversation as resolved.
Show resolved Hide resolved

- name: Set up Go
uses: actions/setup-go@v2
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
with:
go-version: 1.21 # Set the Go version you want to use

- name: Build Docker Images
run: |
make dockers

- name: Docker Login
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
run: |
echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
env:
DOCKER_USERNAME: ${{ github.actor }}
DOCKER_PASSWORD: ${{ secrets.GITHUB_TOKEN }}

- name: Push Docker Images
run: |
make latest
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ define make_docker
--build-arg VERSION=$(VERSION) \
--build-arg COMMIT=$(COMMIT) \
--build-arg TIME=$(TIME) \
--tag=mainflux/$(svc) \
--tag=sammyoina/$(svc) \
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
-f docker/Dockerfile .
endef

Expand Down Expand Up @@ -82,7 +82,7 @@ dockers_dev: $(DOCKERS_DEV)

define docker_push
for svc in $(SERVICES); do \
docker push mainflux/$$svc:$(1); \
docker push sammyoina/$$svc:$(1); \
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
done
endef

Expand Down
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func main() {
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
return
}
defer svc.Close()

svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand Down Expand Up @@ -402,7 +403,7 @@ func StopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger lo
shutdownCtx, shutdownCancel := context.WithTimeout(ctx, 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("Failed to shutdown %s server: %v", svcName, err)
return fmt.Errorf("failed to shutdown %s server: %v", svcName, err)
}
return fmt.Errorf("%s service shutdown by signal: %s", svcName, sig)
case <-ctx.Done():
Expand Down
5 changes: 3 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ WORKDIR /go/src/github.com/mainflux/agent
COPY . .
RUN apk update \
&& apk add make\
&& make $SVC \
&& mv build/mainflux-$SVC /exe
&& make $SVC
RUN ls build
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
RUN mv build/mainflux-$SVC /exe
SammyOina marked this conversation as resolved.
Show resolved Hide resolved

FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/api/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func TestPublish(t *testing.T) {
{"publish data", data, http.StatusOK},
{"publish data with invalid data", "}", http.StatusInternalServerError},
}
t.Cleanup(func() {
assert.Nil(t, svc.Close())
})

for _, tc := range cases {
req := testRequest{
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,16 @@ func (lm loggingMiddleware) Terminal(uuid, cmdStr string) (err error) {

return lm.svc.Terminal(uuid, cmdStr)
}

func (lm loggingMiddleware) Close() (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method close took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Close()
}
13 changes: 11 additions & 2 deletions pkg/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,18 @@ func (ms *metricsMiddleware) Publish(topic, payload string) error {

func (ms *metricsMiddleware) Terminal(topic, payload string) error {
defer func(begin time.Time) {
ms.counter.With("method", "publish").Add(1)
ms.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
ms.counter.With("method", "terminal").Add(1)
ms.latency.With("method", "terminal").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Terminal(topic, payload)
}

func (ms *metricsMiddleware) Close() error {
defer func(begin time.Time) {
ms.counter.With("method", "close").Add(1)
ms.latency.With("method", "close").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Close()
}
18 changes: 12 additions & 6 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ package agent
import (
"crypto/tls"
"encoding/json"
"fmt"
"os"
"time"

"github.com/mainflux/mainflux/pkg/errors"
"github.com/pelletier/go-toml"
)

var (
ErrWritingToml = errors.New("error writing to toml file")
errReadingFile = errors.New("error reading config file")
errUnmarshalToml = errors.New("error unmarshaling toml")
errMarshalToml = errors.New("error marshaling toml")
)

type ServerConfig struct {
Port string `toml:"port" json:"port"`
BrokerURL string `toml:"broker_url" json:"broker_url"`
Expand Down Expand Up @@ -86,24 +92,24 @@ func NewConfig(sc ServerConfig, cc ChanConfig, ec EdgexConfig, lc LogConfig, mc
func SaveConfig(c Config) error {
b, err := toml.Marshal(c)
if err != nil {
return errors.New(fmt.Sprintf("Error reading config file: %s", err))
return errors.Wrap(errMarshalToml, err)
}
if err := os.WriteFile(c.File, b, 0644); err != nil {
return errors.New(fmt.Sprintf("Error writing toml: %s", err))
return errors.Wrap(ErrWritingToml, err)
}
return nil
}

// Read - retrieve config from a file.
// ReadConfig - retrieve config from a file.
func ReadConfig(file string) (Config, error) {
data, err := os.ReadFile(file)
c := Config{}
if err != nil {
return c, errors.New(fmt.Sprintf("Error reading config file: %s", err))
return Config{}, errors.Wrap(errReadingFile, err)
}

if err := toml.Unmarshal(data, &c); err != nil {
return Config{}, errors.New(fmt.Sprintf("Error unmarshaling toml: %s", err))
return Config{}, errors.Wrap(errUnmarshalToml, err)
}
return c, nil
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/agent/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package agent

import (
"fmt"
"os"
"strings"
"testing"

"github.com/mainflux/mainflux/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestReadConfig(t *testing.T) {
// Create a temporary config file for testing.
tempFile, err := os.CreateTemp("", "config.toml")
if err != nil {
t.Fatalf("Failed to create temporary file: %v", err)
}
defer os.Remove(tempFile.Name())
tempFile2, err := os.CreateTemp("", "invalid.toml")
if err != nil {
t.Fatalf("Failed to create temporary file: %v", err)
}
defer os.Remove(tempFile2.Name())

sampleConfig := `
File = "config.toml"

[channels]
control = ""
data = ""

[edgex]
url = "http://localhost:48090/api/v1/"

[heartbeat]
interval = "10s"

[log]
level = "info"

[mqtt]
ca_cert = ""
ca_path = "ca.crt"
cert_path = "thing.cert"
client_cert = ""
client_key = ""
mtls = false
password = ""
priv_key_path = "thing.key"
qos = 0
retain = false
skip_tls_ver = true
url = "localhost:1883"
username = ""

[server]
nats_url = "nats://127.0.0.1:4222"
port = "9999"

[terminal]
session_timeout = "1m0s"
`

if _, writeErr := tempFile.WriteString(sampleConfig); writeErr != nil {
t.Fatalf("Failed to write to temporary file: %v", writeErr)
}
tempFile.Close()

if _, writeErr := tempFile2.WriteString(strings.ReplaceAll(sampleConfig, "[", "")); writeErr != nil {
t.Fatalf("Failed to write to temporary file: %v", writeErr)
}
tempFile2.Close()

tests := []struct {
name string
fileName string
expectedErr error
}{
{
name: "failed to read file",
fileName: "invalidFile.toml",
expectedErr: errReadingFile,
},
{
name: "invalid toml",
fileName: tempFile2.Name(),
expectedErr: errUnmarshalToml,
},
{
name: "successful read",
fileName: tempFile.Name(),
expectedErr: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := ReadConfig(test.fileName)
assert.True(t, errors.Contains(err, test.expectedErr), fmt.Sprintf("expected %v got %v", test.expectedErr, err))
})
}
}
26 changes: 19 additions & 7 deletions pkg/agent/heartbeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"context"
"sync"
"time"
)
Expand Down Expand Up @@ -33,11 +34,12 @@ type Info struct {
type Heartbeat interface {
Update()
Info() Info
Close()
}

// interval - duration of interval
// if service doesnt send heartbeat during interval it is marked offline.
func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat {
func NewHeartbeat(ctx context.Context, name, svcType string, interval time.Duration) Heartbeat {
ticker := time.NewTicker(interval)
s := svc{
info: Info{
Expand All @@ -49,22 +51,25 @@ func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat {
ticker: ticker,
interval: interval,
}
s.listen()
go s.listen(ctx)
return &s
}

func (s *svc) listen() {
go func() {
for range s.ticker.C {
func (s *svc) listen(ctx context.Context) {
for {
select {
case <-s.ticker.C:
// TODO - we can disable ticker when the status gets OFFLINE
// and on the next heartbeat enable it again.
s.mu.Lock()
if time.Now().After(s.info.LastSeen.Add(s.interval)) {
s.info.Status = offline
}
s.mu.Unlock()
case <-ctx.Done():
return
}
}()
}
}

func (s *svc) Update() {
Expand All @@ -75,5 +80,12 @@ func (s *svc) Update() {
}

func (s *svc) Info() Info {
return s.info
s.mu.Lock()
defer s.mu.Unlock()
info := s.info
return info
}

func (s *svc) Close() {
s.ticker.Stop()
}
Loading