From 18125d34b1ec03fdcee5b0a20c10f24c1dc2cbfe Mon Sep 17 00:00:00 2001 From: Raphael Simon Date: Sun, 20 Oct 2024 20:03:11 -0700 Subject: [PATCH] Add ability to list jobs and workers Fix a couple of potential panics: 1. If `rmap.RemoveValues` doesn't remove a value. 2. When acking a non existing event. --- pool/node.go | 62 +++++++++++++-- pool/node_test.go | 192 ++++++++++++++++++++++++++++++++++++++++++++++ pool/worker.go | 9 ++- rmap/map.go | 2 +- 4 files changed, 257 insertions(+), 8 deletions(-) diff --git a/pool/node.go b/pool/node.go index 8d2a4d2..4cd3d90 100644 --- a/pool/node.go +++ b/pool/node.go @@ -7,8 +7,10 @@ import ( "hash" "hash/crc64" "io" + "slices" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -261,13 +263,39 @@ func (node *Node) Workers() []*Worker { node.lock.Lock() defer node.lock.Unlock() workers := make([]*Worker, len(node.localWorkers)) - copy(workers, node.localWorkers) + for i, w := range node.localWorkers { + workers[i] = &Worker{ + ID: w.ID, + CreatedAt: w.CreatedAt, + Node: node, + } + } return workers } +// PoolWorkers returns the list of workers running in the pool. +func (node *Node) PoolWorkers() []*Worker { + workers := node.workerMap.Map() + poolWorkers := make([]*Worker, 0, len(workers)) + for id, createdAt := range workers { + cat, err := strconv.ParseInt(createdAt, 10, 64) + if err != nil { + node.logger.Error(fmt.Errorf("PoolWorkers: failed to parse createdAt %q for worker %q: %w", createdAt, id, err)) + continue + } + poolWorkers = append(poolWorkers, &Worker{ID: id, CreatedAt: time.Unix(0, cat), Node: node}) + } + return poolWorkers +} + // DispatchJob dispatches a job to the proper worker in the pool. -// It returns the error returned by the worker's start handler if any. -// If the context is done before the job is dispatched, the context error is returned. +// It returns: +// - nil if the job is successfully dispatched and started by a worker +// - an error returned by the worker's start handler if the job fails to start +// - the context error if the context is canceled before the job is started +// - an error if the pool is closed or if there's a failure in adding the job +// +// The method blocks until one of the above conditions is met. func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) error { // Send job to pool stream. node.lock.Lock() @@ -313,6 +341,31 @@ func (node *Node) StopJob(ctx context.Context, key string) error { return nil } +// JobKeys returns the list of keys of the jobs running in the pool. +func (node *Node) JobKeys() []string { + var jobKeys []string + jobByNodes := node.jobsMap.Map() + for _, jobs := range jobByNodes { + jobKeys = append(jobKeys, strings.Split(jobs, ",")...) + } + return jobKeys +} + +// JobPayload returns the payload of the job with the given key. +// It returns: +// - (payload, true) if the job exists and has a payload +// - (nil, true) if the job exists but has no payload (empty payload) +// - (nil, false) if the job does not exist +func (node *Node) JobPayload(key string) ([]byte, bool) { + payload, ok := node.jobPayloadsMap.Get(key) + if ok { + return []byte(payload), true + } + keys := node.JobKeys() + return nil, slices.Contains(keys, key) + +} + // NotifyWorker notifies the worker that handles the job with the given key. func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error { node.lock.Lock() @@ -534,9 +587,6 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) { pending, ok := node.pendingEvents[key] if !ok { node.logger.Error(fmt.Errorf("ackWorkerEvent: received unknown event %s from worker %s", ack.EventID, workerID)) - if err := node.poolSink.Ack(ctx, pending); err != nil { - node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to ack unknown event: %w", err), "event", pending.EventName, "id", pending.ID) - } return } diff --git a/pool/node_test.go b/pool/node_test.go index 1c8a34e..a9822c0 100644 --- a/pool/node_test.go +++ b/pool/node_test.go @@ -22,6 +22,170 @@ const ( max = time.Second ) +func TestWorkers(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx := ptesting.NewTestContext(t) + rdb := ptesting.NewRedisClient(t) + node := newTestNode(t, ctx, rdb, testName) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + // Create a few workers + worker1 := newTestWorker(t, ctx, node) + worker2 := newTestWorker(t, ctx, node) + worker3 := newTestWorker(t, ctx, node) + + // Get the list of workers + workers := node.Workers() + + // Check if the number of workers is correct + assert.Equal(t, 3, len(workers), "Expected 3 workers") + + // Check if all created workers are in the list + expectedWorkers := []string{worker1.ID, worker2.ID, worker3.ID} + actualWorkers := make([]string, len(workers)) + for i, w := range workers { + actualWorkers[i] = w.ID + } + assert.ElementsMatch(t, expectedWorkers, actualWorkers, "The list of workers should contain all created workers") + + // Shutdown node + assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node") +} + +func TestPoolWorkers(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx := ptesting.NewTestContext(t) + rdb := ptesting.NewRedisClient(t) + node := newTestNode(t, ctx, rdb, testName) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + // Create workers on the current node + worker1 := newTestWorker(t, ctx, node) + worker2 := newTestWorker(t, ctx, node) + + // Create a worker on a different node + otherNode := newTestNode(t, ctx, rdb, testName) + worker3 := newTestWorker(t, ctx, otherNode) + defer func() { assert.NoError(t, otherNode.Shutdown(ctx)) }() + + // Check if the number of workers is correct (should include workers from all nodes) + assert.Eventually(t, func() bool { + return len(node.PoolWorkers()) == 3 + }, max, delay, "Expected 3 workers in the pool") + + // Check if all created workers are in the list + poolWorkers := node.PoolWorkers() + workerIDs := make([]string, len(poolWorkers)) + for i, w := range poolWorkers { + workerIDs[i] = w.ID + } + + expectedWorkerIDs := []string{worker1.ID, worker2.ID, worker3.ID} + assert.ElementsMatch(t, expectedWorkerIDs, workerIDs, "Not all expected workers were found in the pool") + + // Shutdown nodes + assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node") +} + +func TestJobKeys(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx := ptesting.NewTestContext(t) + rdb := ptesting.NewRedisClient(t) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + node1 := newTestNode(t, ctx, rdb, testName) + node2 := newTestNode(t, ctx, rdb, testName) + newTestWorker(t, ctx, node1) + newTestWorker(t, ctx, node2) + defer func() { + assert.NoError(t, node1.Shutdown(ctx)) + assert.NoError(t, node2.Shutdown(ctx)) + }() + + // Configure nodes to send jobs to specific workers + node1.h, node2.h = &ptesting.Hasher{Index: 0}, &ptesting.Hasher{Index: 1} + + jobs := []struct { + key string + payload []byte + }{ + {key: "job1", payload: []byte("payload1")}, + {key: "job2", payload: []byte("payload2")}, + {key: "job3", payload: []byte("payload3")}, + {key: "job4", payload: []byte("payload4")}, + } + + for _, job := range jobs { + assert.NoError(t, node1.DispatchJob(ctx, job.key, job.payload), fmt.Sprintf("Failed to dispatch job: %s", job.key)) + } + + // Get job keys from the pool and check if all dispatched job keys are present + var allJobKeys []string + assert.Eventually(t, func() bool { + allJobKeys = node1.JobKeys() + return len(jobs) == len(allJobKeys) + }, max, delay, fmt.Sprintf("Number of job keys doesn't match the number of dispatched jobs: %d != %d", len(jobs), len(allJobKeys))) + for _, job := range jobs { + assert.Contains(t, allJobKeys, job.key, fmt.Sprintf("Job key %s not found in JobKeys", job.key)) + } + + // Dispatch a job with an existing key to node1 + assert.NoError(t, node1.DispatchJob(ctx, "job1", []byte("updated payload")), "Failed to dispatch job with existing key") + + // Check that the number of job keys hasn't changed + updatedAllJobKeys := node1.JobKeys() + assert.Equal(t, len(allJobKeys), len(updatedAllJobKeys), "Number of job keys shouldn't change when updating an existing job") +} + +func TestJobPayload(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx := ptesting.NewTestContext(t) + rdb := ptesting.NewRedisClient(t) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + node := newTestNode(t, ctx, rdb, testName) + newTestWorker(t, ctx, node) + defer func() { assert.NoError(t, node.Shutdown(ctx)) }() + + tests := []struct { + name string + key string + payload []byte + }{ + {"job with payload", "job1", []byte("payload1")}, + {"job without payload", "job2", nil}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.NoError(t, node.DispatchJob(ctx, tt.key, tt.payload), "Failed to dispatch job") + + // Check if job payload is correct + assert.Eventually(t, func() bool { + payload, ok := node.JobPayload(tt.key) + fmt.Println(payload, ok) + fmt.Println(tt.payload) + return ok && assert.Equal(t, tt.payload, payload) + }, max, delay, fmt.Sprintf("Failed to get correct payload for job %s", tt.key)) + }) + } + + // Test non-existent job + payload, ok := node.JobPayload("non-existent-job") + assert.False(t, ok, "Expected false for non-existent job") + assert.Nil(t, payload, "Expected nil payload for non-existent job") + + // Update existing job + updatedPayload := []byte("updated payload") + assert.NoError(t, node.DispatchJob(ctx, "job1", updatedPayload), "Failed to update existing job") + + // Check if the payload was updated + assert.Eventually(t, func() bool { + payload, ok := node.JobPayload("job1") + return ok && assert.Equal(t, updatedPayload, payload, "Payload was not updated correctly") + }, max, delay, "Failed to get updated payload for job") +} + func TestDispatchJobOneWorker(t *testing.T) { testName := strings.Replace(t.Name(), "/", "_", -1) ctx := ptesting.NewTestContext(t) @@ -306,6 +470,34 @@ func TestNodeCloseAndRequeue(t *testing.T) { require.NoError(t, node2.Shutdown(ctx), "Failed to shutdown node2") } +func TestAckWorkerEventWithMissingPendingEvent(t *testing.T) { + // Setup + ctx := ptesting.NewTestContext(t) + testName := strings.Replace(t.Name(), "/", "_", -1) + rdb := ptesting.NewRedisClient(t) + defer ptesting.CleanupRedis(t, rdb, true, testName) + node := newTestNode(t, ctx, rdb, testName) + defer func() { assert.NoError(t, node.Shutdown(ctx)) }() + + // Create a mock event with a non-existent pending event ID + mockEvent := &streaming.Event{ + ID: "non-existent-event-id", + EventName: evAck, + Payload: marshalEnvelope("worker", marshalAck(&ack{EventID: "non-existent-event-id"})), + Acker: &mockAcker{ + XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd { + return redis.NewIntCmd(ctx, 0) + }, + }, + } + + // Call ackWorkerEvent with the mock event + node.ackWorkerEvent(ctx, mockEvent) + + // Verify that no panic occurred and the function completed successfully + assert.True(t, true, "ackWorkerEvent should complete without panic") +} + func TestStaleEventsAreRemoved(t *testing.T) { // Setup ctx := ptesting.NewTestContext(t) diff --git a/pool/worker.go b/pool/worker.go index b5cfcd7..7798b23 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -144,7 +144,14 @@ func (w *Worker) Jobs() []*Job { sort.Strings(keys) jobs := make([]*Job, 0, len(w.jobs)) for _, key := range keys { - jobs = append(jobs, w.jobs[key]) + job := w.jobs[key] + jobs = append(jobs, &Job{ + Key: key, + Payload: job.Payload, + CreatedAt: job.CreatedAt, + Worker: &Worker{ID: w.ID, Node: w.Node, CreatedAt: w.CreatedAt}, + NodeID: job.NodeID, + }) } return jobs } diff --git a/rmap/map.go b/rmap/map.go index 3cffe14..477f556 100644 --- a/rmap/map.go +++ b/rmap/map.go @@ -532,7 +532,7 @@ func (sm *Map) RemoveValues(ctx context.Context, key string, items ...string) ([ if remaining == "" { return nil, true, nil // All items were removed, key was deleted } - removed := result[1].(int64) == 1 + removed := result[1] != nil && result[1].(int64) == 1 return strings.Split(remaining, ","), removed, nil }