From 75b65afcbf7ac47cbdd4baa964cbe6d0009f35ce Mon Sep 17 00:00:00 2001 From: b1ackd0t <28790446+rodneyosodo@users.noreply.github.com> Date: Thu, 25 Jan 2024 18:02:13 +0300 Subject: [PATCH] NOISSUE - Make Check Interval Configurable (#277) Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo --- pkg/events/nats/publisher.go | 11 -- pkg/events/nats/publisher_test.go | 143 ++++++++++++++------------ pkg/events/nats/setup_test.go | 6 +- pkg/events/rabbitmq/publisher.go | 50 ++------- pkg/events/rabbitmq/publisher_test.go | 134 +++++++++++++----------- pkg/events/rabbitmq/setup_test.go | 13 +-- pkg/events/redis/publisher.go | 18 ++-- pkg/events/redis/publisher_test.go | 92 ++++++++++++++--- pkg/events/redis/setup_test.go | 9 +- pkg/events/store/brokers_redis.go | 2 +- 10 files changed, 252 insertions(+), 226 deletions(-) diff --git a/pkg/events/nats/publisher.go b/pkg/events/nats/publisher.go index 29c07f1e7..e711f9701 100644 --- a/pkg/events/nats/publisher.go +++ b/pkg/events/nats/publisher.go @@ -50,8 +50,6 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er stream: stream, } - go es.StartPublishingRoutine(ctx) - return es, nil } @@ -74,15 +72,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error return es.publisher.Publish(ctx, es.stream, record) } -func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) { - // Nats doesn't need to check for unpublished events - // since the events are published to a buffer. - // The buffer is flushed when the connection is reestablished. - // https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer - - <-ctx.Done() -} - func (es *pubEventStore) Close() error { es.conn.Close() diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go index fc864bf16..ef6955064 100644 --- a/pkg/events/nats/publisher_test.go +++ b/pkg/events/nats/publisher_test.go @@ -23,6 +23,7 @@ var ( eventsChan = make(chan map[string]interface{}) logger = mglog.NewMock() errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -50,16 +51,19 @@ func (te testEvent) Encode() (map[string]interface{}, error) { } func TestPublish(t *testing.T) { - publisher, err := nats.NewPublisher(ctx, natsURL, stream) + _, err := nats.NewPublisher(context.Background(), "http://invaliurl.com", stream) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - _, err = nats.NewSubscriber(ctx, "http://invaliurl.com", stream, consumer, logger) + _, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", stream, consumer, logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err := nats.NewSubscriber(ctx, natsURL, stream, consumer, logger) + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(ctx, handler{}) + err = subcriber.Subscribe(context.Background(), handler{}) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -122,11 +126,9 @@ func TestPublish(t *testing.T) { for _, tc := range cases { event := testEvent{Data: tc.event} - err := publisher.Publish(ctx, event) + err := publisher.Publish(context.Background(), event) switch tc.err { case nil: - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - receivedEvent := <-eventsChan val := int64(receivedEvent["occurred_at"].(float64)) @@ -135,67 +137,18 @@ func TestPublish(t *testing.T) { delete(tc.event, "occurred_at") } - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) - assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) - + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) + assert.ErrorContains(t, err, tc.err.Error()) } } } -func TestUnavailablePublish(t *testing.T) { - _, err := nats.NewPublisher(ctx, "http://invaliurl.com", stream) - assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - - publisher, err := nats.NewPublisher(ctx, natsURL, stream) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - - err = pool.Client.PauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) - - spawnGoroutines(publisher, t) - - err = pool.Client.UnpauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) - - // Wait for the events to be published. - time.Sleep(events.UnpublishedEventsCheckInterval) - - err = publisher.Close() - assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) -} - -func generateRandomEvent() testEvent { - return testEvent{ - Data: map[string]interface{}{ - "temperature": fmt.Sprintf("%f", rand.Float64()), - "humidity": fmt.Sprintf("%f", rand.Float64()), - "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), - "location": fmt.Sprintf("%f", rand.Float64()), - "status": fmt.Sprintf("%d", rand.Intn(1000)), - "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), - "operation": "create", - }, - } -} - -func spawnGoroutines(publisher events.Publisher, t *testing.T) { - for i := 0; i < 1e4; i++ { - go func() { - for i := 0; i < 10; i++ { - event := generateRandomEvent() - err := publisher.Publish(ctx, event) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - } - }() - } -} - func TestPubsub(t *testing.T) { subcases := []struct { desc string @@ -256,16 +209,14 @@ func TestPubsub(t *testing.T) { } for _, pc := range subcases { - subcriber, err := nats.NewSubscriber(ctx, natsURL, pc.stream, pc.consumer, logger) + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, pc.stream, pc.consumer, logger) if err != nil { assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) continue } - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - - switch err := subcriber.Subscribe(context.TODO(), pc.handler); { + switch err := subcriber.Subscribe(context.Background(), pc.handler); { case err == nil: assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) default: @@ -277,6 +228,64 @@ func TestPubsub(t *testing.T) { } } +func TestUnavailablePublish(t *testing.T) { + publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = subcriber.Subscribe(context.Background(), handler{}) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) + + err = pool.Client.PauseContainer(container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + time.Sleep(1 * time.Second) + + err = pool.Client.UnpauseContainer(container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(1 * time.Second) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + // read all the events from the channel and assert that they are 10. + var receivedEvents []map[string]interface{} + for i := 0; i < numEvents; i++ { + event := <-eventsChan + receivedEvents = append(receivedEvents, event) + } + assert.Len(t, receivedEvents, numEvents, "got unexpected number of events") +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < numEvents; i++ { + go func() { + err := publisher.Publish(context.Background(), generateRandomEvent()) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + }() + } +} + type handler struct { fail bool } diff --git a/pkg/events/nats/setup_test.go b/pkg/events/nats/setup_test.go index d7965b336..143e4863b 100644 --- a/pkg/events/nats/setup_test.go +++ b/pkg/events/nats/setup_test.go @@ -20,7 +20,6 @@ var ( natsURL string stream = "tests.events" consumer = "tests-consumer" - ctx = context.Background() pool *dockertest.Pool container *dockertest.Resource ) @@ -33,7 +32,6 @@ func TestMain(m *testing.M) { } container, err = pool.RunWithOptions(&dockertest.RunOptions{ - Name: "test-nats-events", Repository: "nats", Tag: "2.10.9-alpine", Cmd: []string{"-DVV", "-js"}, @@ -47,14 +45,14 @@ func TestMain(m *testing.M) { natsURL = fmt.Sprintf("nats://%s:%s", "localhost", container.GetPort("4222/tcp")) if err := pool.Retry(func() error { - _, err = nats.NewPublisher(ctx, natsURL, stream) + _, err = nats.NewPublisher(context.Background(), natsURL, stream) return err }); err != nil { log.Fatalf("Could not connect to docker: %s", err) } if err := pool.Retry(func() error { - _, err = nats.NewSubscriber(ctx, natsURL, stream, consumer, logger) + _, err = nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) return err }); err != nil { log.Fatalf("Could not connect to docker: %s", err) diff --git a/pkg/events/rabbitmq/publisher.go b/pkg/events/rabbitmq/publisher.go index 37ca3e19b..ba7d735ae 100644 --- a/pkg/events/rabbitmq/publisher.go +++ b/pkg/events/rabbitmq/publisher.go @@ -6,7 +6,6 @@ package rabbitmq import ( "context" "encoding/json" - "sync" "time" "github.com/absmach/magistrala/pkg/events" @@ -16,11 +15,9 @@ import ( ) type pubEventStore struct { - conn *amqp.Connection - publisher messaging.Publisher - unpublishedEvents chan amqp.Return - stream string - mu sync.Mutex + conn *amqp.Connection + publisher messaging.Publisher + stream string } func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { @@ -42,16 +39,11 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er } es := &pubEventStore{ - conn: conn, - publisher: publisher, - unpublishedEvents: make(chan amqp.Return, events.MaxUnpublishedEvents), - stream: stream, + conn: conn, + publisher: publisher, + stream: stream, } - ch.NotifyReturn(es.unpublishedEvents) - - go es.StartPublishingRoutine(ctx) - return es, nil } @@ -74,36 +66,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error return es.publisher.Publish(ctx, es.stream, record) } -func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) { - defer close(es.unpublishedEvents) - - ticker := time.NewTicker(events.UnpublishedEventsCheckInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if ok := es.conn.IsClosed(); !ok { - es.mu.Lock() - for i := len(es.unpublishedEvents) - 1; i >= 0; i-- { - record := <-es.unpublishedEvents - msg := &messaging.Message{ - Payload: record.Body, - } - if err := es.publisher.Publish(ctx, es.stream, msg); err != nil { - es.unpublishedEvents <- record - - break - } - } - es.mu.Unlock() - } - case <-ctx.Done(): - return - } - } -} - func (es *pubEventStore) Close() error { es.conn.Close() diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go index c9c83fbf6..252784a3c 100644 --- a/pkg/events/rabbitmq/publisher_test.go +++ b/pkg/events/rabbitmq/publisher_test.go @@ -23,6 +23,7 @@ var ( eventsChan = make(chan map[string]interface{}) logger = mglog.NewMock() errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -50,7 +51,10 @@ func (te testEvent) Encode() (map[string]interface{}, error) { } func TestPublish(t *testing.T) { - publisher, err := rabbitmq.NewPublisher(ctx, rabbitmqURL, stream) + _, err := rabbitmq.NewPublisher(context.Background(), "http://invaliurl.com", stream) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) _, err = rabbitmq.NewSubscriber("http://invaliurl.com", stream, consumer, logger) @@ -59,7 +63,7 @@ func TestPublish(t *testing.T) { subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(ctx, handler{}) + err = subcriber.Subscribe(context.Background(), handler{}) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -122,11 +126,9 @@ func TestPublish(t *testing.T) { for _, tc := range cases { event := testEvent{Data: tc.event} - err := publisher.Publish(ctx, event) + err := publisher.Publish(context.Background(), event) switch tc.err { case nil: - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - receivedEvent := <-eventsChan val := int64(receivedEvent["occurred_at"].(float64)) @@ -135,12 +137,12 @@ func TestPublish(t *testing.T) { delete(tc.event, "occurred_at") } - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) - assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) default: assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) @@ -148,54 +150,6 @@ func TestPublish(t *testing.T) { } } -func TestUnavailablePublish(t *testing.T) { - _, err := rabbitmq.NewPublisher(ctx, "http://invaliurl.com", stream) - assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - - publisher, err := rabbitmq.NewPublisher(ctx, rabbitmqURL, stream) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - - err = pool.Client.PauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) - - spawnGoroutines(publisher, t) - - err = pool.Client.UnpauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) - - // Wait for the events to be published. - time.Sleep(2 * events.UnpublishedEventsCheckInterval) - - err = publisher.Close() - assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) -} - -func generateRandomEvent() testEvent { - return testEvent{ - Data: map[string]interface{}{ - "temperature": fmt.Sprintf("%f", rand.Float64()), - "humidity": fmt.Sprintf("%f", rand.Float64()), - "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), - "location": fmt.Sprintf("%f", rand.Float64()), - "status": fmt.Sprintf("%d", rand.Intn(1000)), - "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), - "operation": "create", - }, - } -} - -func spawnGoroutines(publisher events.Publisher, t *testing.T) { - for i := 0; i < 1e4; i++ { - go func() { - for i := 0; i < 10; i++ { - event := generateRandomEvent() - err := publisher.Publish(ctx, event) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - } - }() - } -} - func TestPubsub(t *testing.T) { subcases := []struct { desc string @@ -263,9 +217,7 @@ func TestPubsub(t *testing.T) { continue } - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - - switch err := subcriber.Subscribe(ctx, pc.handler); { + switch err := subcriber.Subscribe(context.Background(), pc.handler); { case err == nil: assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) default: @@ -277,6 +229,64 @@ func TestPubsub(t *testing.T) { } } +func TestUnavailablePublish(t *testing.T) { + publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = subcriber.Subscribe(context.Background(), handler{}) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) + + err = pool.Client.PauseContainer(container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + time.Sleep(1 * time.Second) + + err = pool.Client.UnpauseContainer(container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(1 * time.Second) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + // read all the events from the channel and assert that they are 10. + var receivedEvents []map[string]interface{} + for i := 0; i < numEvents; i++ { + event := <-eventsChan + receivedEvents = append(receivedEvents, event) + } + assert.Len(t, receivedEvents, numEvents, "got unexpected number of events") +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < numEvents; i++ { + go func() { + err := publisher.Publish(context.Background(), generateRandomEvent()) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + }() + } +} + type handler struct { fail bool } diff --git a/pkg/events/rabbitmq/setup_test.go b/pkg/events/rabbitmq/setup_test.go index a36a2063b..4db3c7c25 100644 --- a/pkg/events/rabbitmq/setup_test.go +++ b/pkg/events/rabbitmq/setup_test.go @@ -20,9 +20,8 @@ var ( rabbitmqURL string stream = "tests.events" consumer = "tests-consumer" - ctx = context.TODO() - pool = &dockertest.Pool{} - container = &dockertest.Resource{} + pool *dockertest.Pool + container *dockertest.Resource ) func TestMain(m *testing.M) { @@ -32,12 +31,10 @@ func TestMain(m *testing.M) { log.Fatalf("Could not connect to docker: %s", err) } - opts := dockertest.RunOptions{ - Name: "test-rabbitmq-events", + container, err = pool.RunWithOptions(&dockertest.RunOptions{ Repository: "rabbitmq", Tag: "3.12.12", - } - container, err = pool.RunWithOptions(&opts) + }) if err != nil { log.Fatalf("Could not start container: %s", err) } @@ -47,7 +44,7 @@ func TestMain(m *testing.M) { rabbitmqURL = fmt.Sprintf("amqp://%s:%s", "localhost", container.GetPort("5672/tcp")) if err := pool.Retry(func() error { - _, err = rabbitmq.NewPublisher(ctx, rabbitmqURL, stream) + _, err = rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) return err }); err != nil { log.Fatalf("Could not connect to docker: %s", err) diff --git a/pkg/events/redis/publisher.go b/pkg/events/redis/publisher.go index c0e7a1688..a6f5c90ac 100644 --- a/pkg/events/redis/publisher.go +++ b/pkg/events/redis/publisher.go @@ -20,9 +20,10 @@ type pubEventStore struct { unpublishedEvents chan *redis.XAddArgs stream string mu sync.Mutex + flushPeriod time.Duration } -func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { +func NewPublisher(ctx context.Context, url, stream string, flushPeriod time.Duration) (events.Publisher, error) { opts, err := redis.ParseURL(url) if err != nil { return nil, err @@ -32,9 +33,10 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er client: redis.NewClient(opts), unpublishedEvents: make(chan *redis.XAddArgs, events.MaxUnpublishedEvents), stream: stream, + flushPeriod: flushPeriod, } - go es.startPublishingRoutine(ctx) + go es.flushUnpublished(ctx) return es, nil } @@ -53,7 +55,7 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error Values: values, } - switch err := es.checkRedisConnection(ctx); err { + switch err := es.checkConnection(ctx); err { case nil: return es.client.XAdd(ctx, record).Err() default: @@ -71,16 +73,18 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error } } -func (es *pubEventStore) startPublishingRoutine(ctx context.Context) { +// flushUnpublished periodically checks the Redis connection and publishes +// the events that were not published due to a connection error. +func (es *pubEventStore) flushUnpublished(ctx context.Context) { defer close(es.unpublishedEvents) - ticker := time.NewTicker(events.UnpublishedEventsCheckInterval) + ticker := time.NewTicker(es.flushPeriod) defer ticker.Stop() for { select { case <-ticker.C: - if err := es.checkRedisConnection(ctx); err == nil { + if err := es.checkConnection(ctx); err == nil { es.mu.Lock() for i := len(es.unpublishedEvents) - 1; i >= 0; i-- { record := <-es.unpublishedEvents @@ -102,7 +106,7 @@ func (es *pubEventStore) Close() error { return es.client.Close() } -func (es *pubEventStore) checkRedisConnection(ctx context.Context) error { +func (es *pubEventStore) checkConnection(ctx context.Context) error { // A timeout is used to avoid blocking the main thread ctx, cancel := context.WithTimeout(ctx, events.ConnCheckInterval) defer cancel() diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index 82ef61589..9bf226dd7 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -29,7 +29,7 @@ var ( eventsChan = make(chan map[string]interface{}) logger = mglog.NewMock() errFailed = errors.New("failed") - ctx = context.TODO() + numEvents = 100 ) type testEvent struct { @@ -57,19 +57,22 @@ func (te testEvent) Encode() (map[string]interface{}, error) { } func TestPublish(t *testing.T) { - err := redisClient.FlushAll(ctx).Err() + err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - publisher, err := redis.NewPublisher(ctx, redisURL, streamName) + _, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", streamName, events.UnpublishedEventsCheckInterval) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, events.UnpublishedEventsCheckInterval) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := redis.NewSubscriber("http://invaliurl.com", streamName, consumer, logger) + _, err = redis.NewSubscriber("http://invaliurl.com", streamName, consumer, logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err = redis.NewSubscriber(redisURL, streamName, consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(ctx, handler{}) + err = subcriber.Subscribe(context.Background(), handler{}) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -132,11 +135,9 @@ func TestPublish(t *testing.T) { for _, tc := range cases { event := testEvent{Data: tc.event} - err := publisher.Publish(ctx, event) + err := publisher.Publish(context.Background(), event) switch tc.err { case nil: - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - receivedEvent := <-eventsChan roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) @@ -146,12 +147,12 @@ func TestPublish(t *testing.T) { delete(tc.event, "occurred_at") } - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) - assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) default: assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) @@ -160,7 +161,7 @@ func TestPublish(t *testing.T) { } func TestPubsub(t *testing.T) { - err := redisClient.FlushAll(ctx).Err() + err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) subcases := []struct { @@ -231,7 +232,7 @@ func TestPubsub(t *testing.T) { assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - switch err := subcriber.Subscribe(context.TODO(), pc.handler); { + switch err := subcriber.Subscribe(context.Background(), pc.handler); { case err == nil: assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) default: @@ -243,6 +244,63 @@ func TestPubsub(t *testing.T) { } } +func TestUnavailablePublish(t *testing.T) { + publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, time.Second) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = subcriber.Subscribe(context.Background(), handler{}) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) + + err = pool.Client.PauseContainer(container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + time.Sleep(1 * time.Second) + + err = pool.Client.UnpauseContainer(container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(1 * time.Second) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + var receivedEvents []map[string]interface{} + for i := 0; i < numEvents; i++ { + event := <-eventsChan + receivedEvents = append(receivedEvents, event) + } + assert.Len(t, receivedEvents, numEvents, "got unexpected number of events") +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < numEvents; i++ { + go func() { + err := publisher.Publish(context.Background(), generateRandomEvent()) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + }() + } +} + type handler struct { fail bool } diff --git a/pkg/events/redis/setup_test.go b/pkg/events/redis/setup_test.go index 851c3f120..541cb2a3d 100644 --- a/pkg/events/redis/setup_test.go +++ b/pkg/events/redis/setup_test.go @@ -7,6 +7,7 @@ package redis_test import ( + "context" "fmt" "log" "os" @@ -32,12 +33,10 @@ func TestMain(m *testing.M) { log.Fatalf("Could not connect to docker: %s", err) } - opts := dockertest.RunOptions{ - Name: "tests-redis-events", + container, err = pool.RunWithOptions(&dockertest.RunOptions{ Repository: "redis", Tag: "7.2.4-alpine", - } - container, err = pool.RunWithOptions(&opts) + }) if err != nil { log.Fatalf("Could not start container: %s", err) } @@ -53,7 +52,7 @@ func TestMain(m *testing.M) { if err := pool.Retry(func() error { redisClient = redis.NewClient(ropts) - return redisClient.Ping(ctx).Err() + return redisClient.Ping(context.Background()).Err() }); err != nil { log.Fatalf("Could not connect to docker: %s", err) } diff --git a/pkg/events/store/brokers_redis.go b/pkg/events/store/brokers_redis.go index f88f093e4..711310123 100644 --- a/pkg/events/store/brokers_redis.go +++ b/pkg/events/store/brokers_redis.go @@ -20,7 +20,7 @@ func init() { } func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { - pb, err := redis.NewPublisher(ctx, url, stream) + pb, err := redis.NewPublisher(ctx, url, stream, events.UnpublishedEventsCheckInterval) if err != nil { return nil, err }