Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests #38

Merged
merged 1 commit into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (node *Node) Shutdown(ctx context.Context) error {
}
if node.clientOnly {
node.lock.Unlock()
return fmt.Errorf("Shutdown: pool %q is client-only", node.Name)
return fmt.Errorf("Shutdown: client-only node cannot shutdown worker pool")
}
node.lock.Unlock()
node.logger.Info("shutting down")
Expand Down Expand Up @@ -401,31 +401,32 @@ func (node *Node) close(ctx context.Context, requeue bool) error {
node.logger.Info("closing")
node.closing = true

// Need to stop workers before requeueing jobs to prevent
// requeued jobs from being handled by this node.
var wg sync.WaitGroup
node.logger.Debug("stopping workers", "count", len(node.localWorkers))
for _, w := range node.localWorkers {
wg.Add(1)
go func(w *Worker) {
defer wg.Done()
w.stopAndWait(ctx)
}(w)
}
wg.Wait()
node.logger.Debug("workers stopped")

for _, w := range node.localWorkers {
if requeue {
if err := w.requeueJobs(ctx); err != nil {
node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", w.ID, err))
continue
if len(node.localWorkers) > 0 {
// Need to stop workers before requeueing jobs to prevent
// requeued jobs from being handled by this node.
var wg sync.WaitGroup
node.logger.Debug("stopping workers", "count", len(node.localWorkers))
for _, w := range node.localWorkers {
wg.Add(1)
go func(w *Worker) {
defer wg.Done()
w.stopAndWait(ctx)
}(w)
}
wg.Wait()
node.logger.Debug("workers stopped")
for _, w := range node.localWorkers {
if requeue {
if err := w.requeueJobs(ctx); err != nil {
node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", w.ID, err))
continue
}
}
w.cleanup(ctx)
}
w.cleanup(ctx)
node.localWorkers = nil
}

node.localWorkers = nil
if !node.clientOnly {
node.poolSink.Close()
node.tickerMap.Close()
Expand Down Expand Up @@ -532,7 +533,10 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
key := workerID + ":" + ack.EventID
pending, ok := node.pendingEvents[key]
if !ok {
node.logger.Error(fmt.Errorf("ackWorkerEvent: received event %s from worker %s that was not dispatched", ack.EventID, workerID))
node.logger.Error(fmt.Errorf("ackWorkerEvent: received unknown event %s from worker %s", ack.EventID, workerID))
if err := node.poolSink.Ack(ctx, pending); err != nil {
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to ack unknown event: %w", err), "event", pending.EventName, "id", pending.ID)
}
return
}

Expand Down
156 changes: 156 additions & 0 deletions pool/node_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package pool

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"goa.design/pulse/streaming"
ptesting "goa.design/pulse/testing"
)

Expand Down Expand Up @@ -90,6 +94,82 @@ func TestDispatchJobTwoWorkers(t *testing.T) {
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestNotifyWorker(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx := ptesting.NewTestContext(t)
rdb := ptesting.NewRedisClient(t)
node := newTestNode(t, ctx, rdb, testName)
defer ptesting.CleanupRedis(t, rdb, true, testName)

// Create a worker
worker := newTestWorker(t, ctx, node)

// Set up notification handling
jobKey := "test-job"
jobPayload := []byte("job payload")
notificationPayload := []byte("test notification")
ch := make(chan []byte, 1)
worker.handler.(*mockHandler).notifyFunc = func(key string, payload []byte) error {
assert.Equal(t, jobKey, key, "Received notification for the wrong key")
assert.Equal(t, notificationPayload, payload, "Received notification for the wrong payload")
close(ch)
return nil
}

// Dispatch a job to ensure the worker is assigned
require.NoError(t, node.DispatchJob(ctx, jobKey, jobPayload))

// Send a notification
err := node.NotifyWorker(ctx, jobKey, notificationPayload)
require.NoError(t, err, "Failed to send notification")

// Wait for the notification to be received
select {
case <-ch:
case <-time.After(max):
t.Fatal("Timeout waiting for notification to be received")
}

// Shutdown node
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestNotifyWorkerNoHandler(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx, buf := ptesting.NewBufferedLogContext(t)
rdb := ptesting.NewRedisClient(t)
node := newTestNode(t, ctx, rdb, testName)
defer ptesting.CleanupRedis(t, rdb, true, testName)

// Create a worker without NotificationHandler implementation
worker := newTestWorkerWithoutNotify(t, ctx, node)

// Dispatch a job to ensure the worker is assigned
jobKey := "test-job"
jobPayload := []byte("job payload")
require.NoError(t, node.DispatchJob(ctx, jobKey, jobPayload))

// Wait for the job to be received by the worker
require.Eventually(t, func() bool {
return len(worker.Jobs()) == 1
}, max, delay, "Job was not received by the worker")

// Send a notification
notificationPayload := []byte("test notification")
assert.NoError(t, node.NotifyWorker(ctx, jobKey, notificationPayload), "Failed to send notification")

// Check that an error was logged
assert.Eventually(t, func() bool {
return strings.Contains(buf.String(), "worker does not implement NotificationHandler, ignoring notification")
}, max, delay, "Expected error message was not logged within the timeout period")

// Ensure the worker is still functioning
assert.Len(t, worker.Jobs(), 1, "Worker should still have the job")

// Shutdown node
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestRemoveWorkerThenShutdown(t *testing.T) {
ctx := ptesting.NewTestContext(t)
testName := strings.Replace(t.Name(), "/", "_", -1)
Expand Down Expand Up @@ -225,3 +305,79 @@ func TestNodeCloseAndRequeue(t *testing.T) {
// Clean up
require.NoError(t, node2.Shutdown(ctx), "Failed to shutdown node2")
}

func TestStaleEventsAreRemoved(t *testing.T) {
// Setup
ctx := ptesting.NewTestContext(t)
testName := strings.Replace(t.Name(), "/", "_", -1)
rdb := ptesting.NewRedisClient(t)
defer ptesting.CleanupRedis(t, rdb, true, testName)
node := newTestNode(t, ctx, rdb, testName)
defer func() { assert.NoError(t, node.Shutdown(ctx)) }()

// Add a stale event manually
staleEventID := fmt.Sprintf("%d-0", time.Now().Add(-2*node.pendingJobTTL).UnixNano()/int64(time.Millisecond))
staleEvent := &streaming.Event{
ID: staleEventID,
EventName: "test-event",
Payload: []byte("test-payload"),
Acker: &mockAcker{
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
return redis.NewIntCmd(ctx, 0)
},
},
}
node.pendingEvents["worker:stale-event-id"] = staleEvent

// Add a fresh event
freshEventID := fmt.Sprintf("%d-0", time.Now().Add(-time.Second).UnixNano()/int64(time.Millisecond))
freshEvent := &streaming.Event{
ID: freshEventID,
EventName: "test-event",
Payload: []byte("test-payload"),
Acker: &mockAcker{
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
return redis.NewIntCmd(ctx, 0)
},
},
}
node.pendingEvents["worker:fresh-event-id"] = freshEvent

// Create a mock event to trigger the ackWorkerEvent function
mockEvent := &streaming.Event{
ID: "mock-event-id",
EventName: evAck,
Payload: marshalEnvelope("worker", marshalAck(&ack{EventID: "mock-event-id"})),
Acker: &mockAcker{
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
return redis.NewIntCmd(ctx, 0)
},
},
}
node.pendingEvents["worker:mock-event-id"] = mockEvent

// Call ackWorkerEvent to trigger the stale event cleanup
node.ackWorkerEvent(ctx, mockEvent)

assert.Eventually(t, func() bool {
node.lock.Lock()
defer node.lock.Unlock()
_, exists := node.pendingEvents["worker:stale-event-id"]
return !exists
}, max, delay, "Stale event should have been removed")

assert.Eventually(t, func() bool {
node.lock.Lock()
defer node.lock.Unlock()
_, exists := node.pendingEvents["worker:fresh-event-id"]
return exists
}, max, delay, "Fresh event should still be present")
}

type mockAcker struct {
XAckFunc func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
}

func (m *mockAcker) XAck(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
return m.XAckFunc(ctx, streamKey, sinkName, ids...)
}
30 changes: 27 additions & 3 deletions pool/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ import (
type mockHandler struct {
startFunc func(job *Job) error
stopFunc func(key string) error
notifyFunc func(payload []byte) error
notifyFunc func(key string, payload []byte) error
}

// mockHandlerWithoutNotify is a mock handler that doesn't implement NotificationHandler
type mockHandlerWithoutNotify struct {
startFunc func(job *Job) error
stopFunc func(key string) error
}

const (
Expand Down Expand Up @@ -47,7 +53,20 @@ func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker {
handler := &mockHandler{
startFunc: func(job *Job) error { return nil },
stopFunc: func(key string) error { return nil },
notifyFunc: func(payload []byte) error { return nil },
notifyFunc: func(key string, payload []byte) error { return nil },
}
worker, err := node.AddWorker(ctx, handler)
require.NoError(t, err)
return worker
}

// newTestWorkerWithoutNotify creates a new Worker instance for testing purposes.
// It sets up a mock handler without NotificationHandler for testing.
func newTestWorkerWithoutNotify(t *testing.T, ctx context.Context, node *Node) *Worker {
t.Helper()
handler := &mockHandlerWithoutNotify{
startFunc: func(job *Job) error { return nil },
stopFunc: func(key string) error { return nil },
}
worker, err := node.AddWorker(ctx, handler)
require.NoError(t, err)
Expand All @@ -56,4 +75,9 @@ func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker {

func (w *mockHandler) Start(job *Job) error { return w.startFunc(job) }
func (w *mockHandler) Stop(key string) error { return w.stopFunc(key) }
func (w *mockHandler) Notify(p []byte) error { return w.notifyFunc(p) }
func (w *mockHandler) HandleNotification(key string, payload []byte) error {
return w.notifyFunc(key, payload)
}

func (h *mockHandlerWithoutNotify) Start(job *Job) error { return h.startFunc(job) }
func (h *mockHandlerWithoutNotify) Stop(key string) error { return h.stopFunc(key) }
2 changes: 1 addition & 1 deletion pool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (w *Worker) notify(_ context.Context, key string, payload []byte) error {
}
nh, ok := w.handler.(NotificationHandler)
if !ok {
w.logger.Debug("worker does not implement NotificationHandler, ignoring notification")
w.logger.Error(fmt.Errorf("worker does not implement NotificationHandler, ignoring notification"), "worker", w.ID)
return nil
}
w.logger.Debug("handled notification", "payload", string(payload))
Expand Down
11 changes: 8 additions & 3 deletions streaming/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ type (
rdb *redis.Client
}

// Acker is the interface used by events to acknowledge themselves.
Acker interface {
XAck(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
}

// Event is a stream event.
Event struct {
// ID is the unique event ID.
Expand All @@ -73,10 +78,10 @@ type (
Topic string
// Payload is the event payload.
Payload []byte
// Acker is the redis client used to acknowledge events.
Acker Acker
// streamKey is the Redis key of the stream.
streamKey string
// rdb is the redis client.
rdb *redis.Client
}
)

Expand Down Expand Up @@ -314,7 +319,7 @@ func streamEvents(
Topic: topic,
Payload: []byte(event.Values[payloadKey].(string)),
streamKey: streamKey,
rdb: rdb,
Acker: rdb,
}
if eventFilter != nil && !eventFilter(ev) {
logger.Debug("event filtered", "event", ev.EventName, "id", ev.ID, "stream", streamName)
Expand Down
2 changes: 1 addition & 1 deletion streaming/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Sink) Unsubscribe(c <-chan *Event) {

// Ack acknowledges the event.
func (s *Sink) Ack(ctx context.Context, e *Event) error {
err := e.rdb.XAck(ctx, e.streamKey, e.SinkName, e.ID).Err()
err := e.Acker.XAck(ctx, e.streamKey, e.SinkName, e.ID).Err()
if err != nil {
s.logger.Error(err, "ack", e.ID, "stream", e.StreamName)
return err
Expand Down