Skip to content
This repository has been archived by the owner on Oct 14, 2024. It is now read-only.

NOISSUE - Make Check Interval Configurable #277

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions pkg/events/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
stream: stream,
}

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,15 +72,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
// Nats doesn't need to check for unpublished events
// since the events are published to a buffer.
// The buffer is flushed when the connection is reestablished.
// https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer

<-ctx.Done()
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
143 changes: 76 additions & 67 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
numEvents = 100
)

type testEvent struct {
Expand Down Expand Up @@ -50,16 +51,19 @@ func (te testEvent) Encode() (map[string]interface{}, error) {
}

func TestPublish(t *testing.T) {
publisher, err := nats.NewPublisher(ctx, natsURL, stream)
_, err := nats.NewPublisher(context.Background(), "http://invaliurl.com", stream)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

_, err = nats.NewSubscriber(ctx, "http://invaliurl.com", stream, consumer, logger)
_, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", stream, consumer, logger)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

subcriber, err := nats.NewSubscriber(ctx, natsURL, stream, consumer, logger)
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(ctx, handler{})
err = subcriber.Subscribe(context.Background(), handler{})
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

cases := []struct {
Expand Down Expand Up @@ -122,11 +126,9 @@ func TestPublish(t *testing.T) {
for _, tc := range cases {
event := testEvent{Data: tc.event}

err := publisher.Publish(ctx, event)
err := publisher.Publish(context.Background(), event)
switch tc.err {
case nil:
assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err))

receivedEvent := <-eventsChan

val := int64(receivedEvent["occurred_at"].(float64))
Expand All @@ -135,67 +137,18 @@ func TestPublish(t *testing.T) {
delete(tc.event, "occurred_at")
}

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"]))
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"]))
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"]))
assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"]))
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"]))
assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"]))

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])
default:
assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err))
assert.ErrorContains(t, err, tc.err.Error())
}
}
}

func TestUnavailablePublish(t *testing.T) {
_, err := nats.NewPublisher(ctx, "http://invaliurl.com", stream)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

publisher, err := nats.NewPublisher(ctx, natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err))

spawnGoroutines(publisher, t)

err = pool.Client.UnpauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err))

// Wait for the events to be published.
time.Sleep(events.UnpublishedEventsCheckInterval)

err = publisher.Close()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err))
}

func generateRandomEvent() testEvent {
return testEvent{
Data: map[string]interface{}{
"temperature": fmt.Sprintf("%f", rand.Float64()),
"humidity": fmt.Sprintf("%f", rand.Float64()),
"sensor_id": fmt.Sprintf("%d", rand.Intn(1000)),
"location": fmt.Sprintf("%f", rand.Float64()),
"status": fmt.Sprintf("%d", rand.Intn(1000)),
"timestamp": fmt.Sprintf("%d", time.Now().UnixNano()),
"operation": "create",
},
}
}

func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < 1e4; i++ {
go func() {
for i := 0; i < 10; i++ {
event := generateRandomEvent()
err := publisher.Publish(ctx, event)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}
}()
}
}

func TestPubsub(t *testing.T) {
subcases := []struct {
desc string
Expand Down Expand Up @@ -256,16 +209,14 @@ func TestPubsub(t *testing.T) {
}

for _, pc := range subcases {
subcriber, err := nats.NewSubscriber(ctx, natsURL, pc.stream, pc.consumer, logger)
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, pc.stream, pc.consumer, logger)
if err != nil {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))

continue
}

assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))

switch err := subcriber.Subscribe(context.TODO(), pc.handler); {
switch err := subcriber.Subscribe(context.Background(), pc.handler); {
case err == nil:
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
default:
Expand All @@ -277,6 +228,64 @@ func TestPubsub(t *testing.T) {
}
}

func TestUnavailablePublish(t *testing.T) {
publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err))

spawnGoroutines(publisher, t)

time.Sleep(1 * time.Second)

err = pool.Client.UnpauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err))

// Wait for the events to be published.
time.Sleep(1 * time.Second)

err = publisher.Close()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err))

// read all the events from the channel and assert that they are 10.
var receivedEvents []map[string]interface{}
for i := 0; i < numEvents; i++ {
event := <-eventsChan
receivedEvents = append(receivedEvents, event)
}
assert.Len(t, receivedEvents, numEvents, "got unexpected number of events")
}

func generateRandomEvent() testEvent {
return testEvent{
Data: map[string]interface{}{
"temperature": fmt.Sprintf("%f", rand.Float64()),
"humidity": fmt.Sprintf("%f", rand.Float64()),
"sensor_id": fmt.Sprintf("%d", rand.Intn(1000)),
"location": fmt.Sprintf("%f", rand.Float64()),
"status": fmt.Sprintf("%d", rand.Intn(1000)),
"timestamp": fmt.Sprintf("%d", time.Now().UnixNano()),
"operation": "create",
},
}
}

func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < numEvents; i++ {
go func() {
err := publisher.Publish(context.Background(), generateRandomEvent())
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}()
}
}

type handler struct {
fail bool
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/events/nats/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var (
natsURL string
stream = "tests.events"
consumer = "tests-consumer"
ctx = context.Background()
pool *dockertest.Pool
container *dockertest.Resource
)
Expand All @@ -33,7 +32,6 @@ func TestMain(m *testing.M) {
}

container, err = pool.RunWithOptions(&dockertest.RunOptions{
Name: "test-nats-events",
Repository: "nats",
Tag: "2.10.9-alpine",
Cmd: []string{"-DVV", "-js"},
Expand All @@ -47,14 +45,14 @@ func TestMain(m *testing.M) {
natsURL = fmt.Sprintf("nats://%s:%s", "localhost", container.GetPort("4222/tcp"))

if err := pool.Retry(func() error {
_, err = nats.NewPublisher(ctx, natsURL, stream)
_, err = nats.NewPublisher(context.Background(), natsURL, stream)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

if err := pool.Retry(func() error {
_, err = nats.NewSubscriber(ctx, natsURL, stream, consumer, logger)
_, err = nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
Expand Down
50 changes: 6 additions & 44 deletions pkg/events/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rabbitmq
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/absmach/magistrala/pkg/events"
Expand All @@ -16,11 +15,9 @@ import (
)

type pubEventStore struct {
conn *amqp.Connection
publisher messaging.Publisher
unpublishedEvents chan amqp.Return
stream string
mu sync.Mutex
conn *amqp.Connection
publisher messaging.Publisher
stream string
}

func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
Expand All @@ -42,16 +39,11 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
}

es := &pubEventStore{
conn: conn,
publisher: publisher,
unpublishedEvents: make(chan amqp.Return, events.MaxUnpublishedEvents),
stream: stream,
conn: conn,
publisher: publisher,
stream: stream,
}

ch.NotifyReturn(es.unpublishedEvents)

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,36 +66,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
defer close(es.unpublishedEvents)

ticker := time.NewTicker(events.UnpublishedEventsCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if ok := es.conn.IsClosed(); !ok {
es.mu.Lock()
for i := len(es.unpublishedEvents) - 1; i >= 0; i-- {
record := <-es.unpublishedEvents
msg := &messaging.Message{
Payload: record.Body,
}
if err := es.publisher.Publish(ctx, es.stream, msg); err != nil {
es.unpublishedEvents <- record

break
}
}
es.mu.Unlock()
}
case <-ctx.Done():
return
}
}
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
Loading
Loading