Skip to content
This repository has been archived by the owner on Oct 14, 2024. It is now read-only.

Commit

Permalink
NOISSUE - Make Check Interval Configurable (#277)
Browse files Browse the repository at this point in the history
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>
  • Loading branch information
rodneyosodo authored Jan 25, 2024
1 parent 867060f commit 75b65af
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 226 deletions.
11 changes: 0 additions & 11 deletions pkg/events/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
stream: stream,
}

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,15 +72,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
// Nats doesn't need to check for unpublished events
// since the events are published to a buffer.
// The buffer is flushed when the connection is reestablished.
// https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer

<-ctx.Done()
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
143 changes: 76 additions & 67 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
numEvents = 100
)

type testEvent struct {
Expand Down Expand Up @@ -50,16 +51,19 @@ func (te testEvent) Encode() (map[string]interface{}, error) {
}

func TestPublish(t *testing.T) {
publisher, err := nats.NewPublisher(ctx, natsURL, stream)
_, err := nats.NewPublisher(context.Background(), "http://invaliurl.com", stream)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

_, err = nats.NewSubscriber(ctx, "http://invaliurl.com", stream, consumer, logger)
_, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", stream, consumer, logger)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

subcriber, err := nats.NewSubscriber(ctx, natsURL, stream, consumer, logger)
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(ctx, handler{})
err = subcriber.Subscribe(context.Background(), handler{})
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

cases := []struct {
Expand Down Expand Up @@ -122,11 +126,9 @@ func TestPublish(t *testing.T) {
for _, tc := range cases {
event := testEvent{Data: tc.event}

err := publisher.Publish(ctx, event)
err := publisher.Publish(context.Background(), event)
switch tc.err {
case nil:
assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err))

receivedEvent := <-eventsChan

val := int64(receivedEvent["occurred_at"].(float64))
Expand All @@ -135,67 +137,18 @@ func TestPublish(t *testing.T) {
delete(tc.event, "occurred_at")
}

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"]))
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"]))
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"]))
assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"]))
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"]))
assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"]))

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])
default:
assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err))
assert.ErrorContains(t, err, tc.err.Error())
}
}
}

func TestUnavailablePublish(t *testing.T) {
_, err := nats.NewPublisher(ctx, "http://invaliurl.com", stream)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

publisher, err := nats.NewPublisher(ctx, natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err))

spawnGoroutines(publisher, t)

err = pool.Client.UnpauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err))

// Wait for the events to be published.
time.Sleep(events.UnpublishedEventsCheckInterval)

err = publisher.Close()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err))
}

func generateRandomEvent() testEvent {
return testEvent{
Data: map[string]interface{}{
"temperature": fmt.Sprintf("%f", rand.Float64()),
"humidity": fmt.Sprintf("%f", rand.Float64()),
"sensor_id": fmt.Sprintf("%d", rand.Intn(1000)),
"location": fmt.Sprintf("%f", rand.Float64()),
"status": fmt.Sprintf("%d", rand.Intn(1000)),
"timestamp": fmt.Sprintf("%d", time.Now().UnixNano()),
"operation": "create",
},
}
}

func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < 1e4; i++ {
go func() {
for i := 0; i < 10; i++ {
event := generateRandomEvent()
err := publisher.Publish(ctx, event)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}
}()
}
}

func TestPubsub(t *testing.T) {
subcases := []struct {
desc string
Expand Down Expand Up @@ -256,16 +209,14 @@ func TestPubsub(t *testing.T) {
}

for _, pc := range subcases {
subcriber, err := nats.NewSubscriber(ctx, natsURL, pc.stream, pc.consumer, logger)
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, pc.stream, pc.consumer, logger)
if err != nil {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))

continue
}

assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))

switch err := subcriber.Subscribe(context.TODO(), pc.handler); {
switch err := subcriber.Subscribe(context.Background(), pc.handler); {
case err == nil:
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
default:
Expand All @@ -277,6 +228,64 @@ func TestPubsub(t *testing.T) {
}
}

func TestUnavailablePublish(t *testing.T) {
publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err))

spawnGoroutines(publisher, t)

time.Sleep(1 * time.Second)

err = pool.Client.UnpauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err))

// Wait for the events to be published.
time.Sleep(1 * time.Second)

err = publisher.Close()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err))

// read all the events from the channel and assert that they are 10.
var receivedEvents []map[string]interface{}
for i := 0; i < numEvents; i++ {
event := <-eventsChan
receivedEvents = append(receivedEvents, event)
}
assert.Len(t, receivedEvents, numEvents, "got unexpected number of events")
}

func generateRandomEvent() testEvent {
return testEvent{
Data: map[string]interface{}{
"temperature": fmt.Sprintf("%f", rand.Float64()),
"humidity": fmt.Sprintf("%f", rand.Float64()),
"sensor_id": fmt.Sprintf("%d", rand.Intn(1000)),
"location": fmt.Sprintf("%f", rand.Float64()),
"status": fmt.Sprintf("%d", rand.Intn(1000)),
"timestamp": fmt.Sprintf("%d", time.Now().UnixNano()),
"operation": "create",
},
}
}

func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < numEvents; i++ {
go func() {
err := publisher.Publish(context.Background(), generateRandomEvent())
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}()
}
}

type handler struct {
fail bool
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/events/nats/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var (
natsURL string
stream = "tests.events"
consumer = "tests-consumer"
ctx = context.Background()
pool *dockertest.Pool
container *dockertest.Resource
)
Expand All @@ -33,7 +32,6 @@ func TestMain(m *testing.M) {
}

container, err = pool.RunWithOptions(&dockertest.RunOptions{
Name: "test-nats-events",
Repository: "nats",
Tag: "2.10.9-alpine",
Cmd: []string{"-DVV", "-js"},
Expand All @@ -47,14 +45,14 @@ func TestMain(m *testing.M) {
natsURL = fmt.Sprintf("nats://%s:%s", "localhost", container.GetPort("4222/tcp"))

if err := pool.Retry(func() error {
_, err = nats.NewPublisher(ctx, natsURL, stream)
_, err = nats.NewPublisher(context.Background(), natsURL, stream)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

if err := pool.Retry(func() error {
_, err = nats.NewSubscriber(ctx, natsURL, stream, consumer, logger)
_, err = nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
Expand Down
50 changes: 6 additions & 44 deletions pkg/events/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rabbitmq
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/absmach/magistrala/pkg/events"
Expand All @@ -16,11 +15,9 @@ import (
)

type pubEventStore struct {
conn *amqp.Connection
publisher messaging.Publisher
unpublishedEvents chan amqp.Return
stream string
mu sync.Mutex
conn *amqp.Connection
publisher messaging.Publisher
stream string
}

func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
Expand All @@ -42,16 +39,11 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
}

es := &pubEventStore{
conn: conn,
publisher: publisher,
unpublishedEvents: make(chan amqp.Return, events.MaxUnpublishedEvents),
stream: stream,
conn: conn,
publisher: publisher,
stream: stream,
}

ch.NotifyReturn(es.unpublishedEvents)

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,36 +66,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
defer close(es.unpublishedEvents)

ticker := time.NewTicker(events.UnpublishedEventsCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if ok := es.conn.IsClosed(); !ok {
es.mu.Lock()
for i := len(es.unpublishedEvents) - 1; i >= 0; i-- {
record := <-es.unpublishedEvents
msg := &messaging.Message{
Payload: record.Body,
}
if err := es.publisher.Publish(ctx, es.stream, msg); err != nil {
es.unpublishedEvents <- record

break
}
}
es.mu.Unlock()
}
case <-ctx.Done():
return
}
}
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
Loading

0 comments on commit 75b65af

Please sign in to comment.