Skip to content
This repository has been archived by the owner on Oct 14, 2024. It is now read-only.

Commit

Permalink
test(evnts): Test TestUnavailablePublish
Browse files Browse the repository at this point in the history
This commit adds a new test case to the publisher. It also removes a method that is no longer needed. The code in this file handles publishing and subscribing to events using RabbitMQ and Redis. It includes logic for pausing and unpausing a container, generating random events, and checking the number of received events. Furthermore, an argument is added to the NewPublisher function to allow specifying an interval for checking unpublished events.

Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo committed Jan 24, 2024
1 parent c122dd6 commit 844d5cf
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 80 deletions.
11 changes: 0 additions & 11 deletions pkg/events/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
stream: stream,
}

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,15 +72,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
// Nats doesn't need to check for unpublished events
// since the events are published to a buffer.
// The buffer is flushed when the connection is reestablished.
// https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer

<-ctx.Done()
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
62 changes: 62 additions & 0 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
numEvents = 100
)

type testEvent struct {
Expand Down Expand Up @@ -50,6 +51,9 @@ func (te testEvent) Encode() (map[string]interface{}, error) {
}

func TestPublish(t *testing.T) {
_, err := nats.NewPublisher(context.Background(), "http://invaliurl.com", stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

Expand Down Expand Up @@ -224,6 +228,64 @@ func TestPubsub(t *testing.T) {
}
}

func TestUnavailablePublish(t *testing.T) {
publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err))

spawnGoroutines(publisher, t)

time.Sleep(1 * time.Second)

err = pool.Client.UnpauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err))

// Wait for the events to be published.
time.Sleep(1 * time.Second)

err = publisher.Close()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err))

// read all the events from the channel and assert that they are 10.
var receivedEvents []map[string]interface{}
for i := 0; i < numEvents; i++ {
event := <-eventsChan
receivedEvents = append(receivedEvents, event)
}
assert.Len(t, receivedEvents, numEvents, "got unexpected number of events")
}

func generateRandomEvent() testEvent {
return testEvent{
Data: map[string]interface{}{
"temperature": fmt.Sprintf("%f", rand.Float64()),
"humidity": fmt.Sprintf("%f", rand.Float64()),
"sensor_id": fmt.Sprintf("%d", rand.Intn(1000)),
"location": fmt.Sprintf("%f", rand.Float64()),
"status": fmt.Sprintf("%d", rand.Intn(1000)),
"timestamp": fmt.Sprintf("%d", time.Now().UnixNano()),
"operation": "create",
},
}
}

func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < numEvents; i++ {
go func() {
err := publisher.Publish(context.Background(), generateRandomEvent())
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}()
}
}

type handler struct {
fail bool
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/events/nats/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ import (
)

var (
natsURL string
stream = "tests.events"
consumer = "tests-consumer"
natsURL string
stream = "tests.events"
consumer = "tests-consumer"
pool *dockertest.Pool
container *dockertest.Resource
)

func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
var err error
pool, err = dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

container, err := pool.RunWithOptions(&dockertest.RunOptions{
container, err = pool.RunWithOptions(&dockertest.RunOptions{
Repository: "nats",
Tag: "2.10.9-alpine",
Cmd: []string{"-DVV", "-js"},
Expand Down
50 changes: 6 additions & 44 deletions pkg/events/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rabbitmq
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/absmach/magistrala/pkg/events"
Expand All @@ -16,11 +15,9 @@ import (
)

type pubEventStore struct {
conn *amqp.Connection
publisher messaging.Publisher
unpublishedEvents chan amqp.Return
stream string
mu sync.Mutex
conn *amqp.Connection
publisher messaging.Publisher
stream string
}

func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
Expand All @@ -42,16 +39,11 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
}

es := &pubEventStore{
conn: conn,
publisher: publisher,
unpublishedEvents: make(chan amqp.Return, events.MaxUnpublishedEvents),
stream: stream,
conn: conn,
publisher: publisher,
stream: stream,
}

ch.NotifyReturn(es.unpublishedEvents)

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,36 +66,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
defer close(es.unpublishedEvents)

ticker := time.NewTicker(events.UnpublishedEventsCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if ok := es.conn.IsClosed(); !ok {
es.mu.Lock()
for i := len(es.unpublishedEvents) - 1; i >= 0; i-- {
record := <-es.unpublishedEvents
msg := &messaging.Message{
Payload: record.Body,
}
if err := es.publisher.Publish(ctx, es.stream, msg); err != nil {
es.unpublishedEvents <- record

break
}
}
es.mu.Unlock()
}
case <-ctx.Done():
return
}
}
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
62 changes: 62 additions & 0 deletions pkg/events/rabbitmq/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
numEvents = 100
)

type testEvent struct {
Expand Down Expand Up @@ -50,6 +51,9 @@ func (te testEvent) Encode() (map[string]interface{}, error) {
}

func TestPublish(t *testing.T) {
_, err := rabbitmq.NewPublisher(context.Background(), "http://invaliurl.com", stream)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

Expand Down Expand Up @@ -225,6 +229,64 @@ func TestPubsub(t *testing.T) {
}
}

func TestUnavailablePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err))

spawnGoroutines(publisher, t)

time.Sleep(1 * time.Second)

err = pool.Client.UnpauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err))

// Wait for the events to be published.
time.Sleep(1 * time.Second)

err = publisher.Close()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err))

// read all the events from the channel and assert that they are 10.
var receivedEvents []map[string]interface{}
for i := 0; i < numEvents; i++ {
event := <-eventsChan
receivedEvents = append(receivedEvents, event)
}
assert.Len(t, receivedEvents, numEvents, "got unexpected number of events")
}

func generateRandomEvent() testEvent {
return testEvent{
Data: map[string]interface{}{
"temperature": fmt.Sprintf("%f", rand.Float64()),
"humidity": fmt.Sprintf("%f", rand.Float64()),
"sensor_id": fmt.Sprintf("%d", rand.Intn(1000)),
"location": fmt.Sprintf("%f", rand.Float64()),
"status": fmt.Sprintf("%d", rand.Intn(1000)),
"timestamp": fmt.Sprintf("%d", time.Now().UnixNano()),
"operation": "create",
},
}
}

func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < numEvents; i++ {
go func() {
err := publisher.Publish(context.Background(), generateRandomEvent())
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}()
}
}

type handler struct {
fail bool
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/events/rabbitmq/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ var (
rabbitmqURL string
stream = "tests.events"
consumer = "tests-consumer"
pool *dockertest.Pool
container *dockertest.Resource
)

func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
var err error
pool, err = dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

container, err := pool.RunWithOptions(&dockertest.RunOptions{
container, err = pool.RunWithOptions(&dockertest.RunOptions{
Repository: "rabbitmq",
Tag: "3.12.12",
})
Expand Down
28 changes: 16 additions & 12 deletions pkg/events/redis/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@ import (
)

type pubEventStore struct {
client *redis.Client
unpublishedEvents chan *redis.XAddArgs
stream string
mu sync.Mutex
client *redis.Client
unpublishedEvents chan *redis.XAddArgs
stream string
mu sync.Mutex
unpublishedEventsCheckInterval time.Duration
}

func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
func NewPublisher(ctx context.Context, url, stream string, unpublishedEventsCheckInterval time.Duration) (events.Publisher, error) {
opts, err := redis.ParseURL(url)
if err != nil {
return nil, err
}

es := &pubEventStore{
client: redis.NewClient(opts),
unpublishedEvents: make(chan *redis.XAddArgs, events.MaxUnpublishedEvents),
stream: stream,
client: redis.NewClient(opts),
unpublishedEvents: make(chan *redis.XAddArgs, events.MaxUnpublishedEvents),
stream: stream,
unpublishedEventsCheckInterval: unpublishedEventsCheckInterval,
}

go es.startPublishingRoutine(ctx)
Expand All @@ -53,7 +55,7 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
Values: values,
}

switch err := es.checkRedisConnection(ctx); err {
switch err := es.checkConnection(ctx); err {
case nil:
return es.client.XAdd(ctx, record).Err()
default:
Expand All @@ -71,16 +73,18 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
}
}

// startPublishingRoutine periodically checks the Redis connection and publishes
// the events that were not published due to a connection error.
func (es *pubEventStore) startPublishingRoutine(ctx context.Context) {
defer close(es.unpublishedEvents)

ticker := time.NewTicker(events.UnpublishedEventsCheckInterval)
ticker := time.NewTicker(es.unpublishedEventsCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := es.checkRedisConnection(ctx); err == nil {
if err := es.checkConnection(ctx); err == nil {
es.mu.Lock()
for i := len(es.unpublishedEvents) - 1; i >= 0; i-- {
record := <-es.unpublishedEvents
Expand All @@ -102,7 +106,7 @@ func (es *pubEventStore) Close() error {
return es.client.Close()
}

func (es *pubEventStore) checkRedisConnection(ctx context.Context) error {
func (es *pubEventStore) checkConnection(ctx context.Context) error {
// A timeout is used to avoid blocking the main thread
ctx, cancel := context.WithTimeout(ctx, events.ConnCheckInterval)
defer cancel()
Expand Down
Loading

0 comments on commit 844d5cf

Please sign in to comment.