Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to list jobs and workers #39

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"hash"
"hash/crc64"
"io"
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -261,13 +263,39 @@ func (node *Node) Workers() []*Worker {
node.lock.Lock()
defer node.lock.Unlock()
workers := make([]*Worker, len(node.localWorkers))
copy(workers, node.localWorkers)
for i, w := range node.localWorkers {
workers[i] = &Worker{
ID: w.ID,
CreatedAt: w.CreatedAt,
Node: node,
}
}
return workers
}

// PoolWorkers returns the list of workers running in the pool.
func (node *Node) PoolWorkers() []*Worker {
workers := node.workerMap.Map()
poolWorkers := make([]*Worker, 0, len(workers))
for id, createdAt := range workers {
cat, err := strconv.ParseInt(createdAt, 10, 64)
if err != nil {
node.logger.Error(fmt.Errorf("PoolWorkers: failed to parse createdAt %q for worker %q: %w", createdAt, id, err))
continue
}
poolWorkers = append(poolWorkers, &Worker{ID: id, CreatedAt: time.Unix(0, cat), Node: node})
}
return poolWorkers
}

// DispatchJob dispatches a job to the proper worker in the pool.
// It returns the error returned by the worker's start handler if any.
// If the context is done before the job is dispatched, the context error is returned.
// It returns:
// - nil if the job is successfully dispatched and started by a worker
// - an error returned by the worker's start handler if the job fails to start
// - the context error if the context is canceled before the job is started
// - an error if the pool is closed or if there's a failure in adding the job
//
// The method blocks until one of the above conditions is met.
func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) error {
// Send job to pool stream.
node.lock.Lock()
Expand Down Expand Up @@ -313,6 +341,31 @@ func (node *Node) StopJob(ctx context.Context, key string) error {
return nil
}

// JobKeys returns the list of keys of the jobs running in the pool.
func (node *Node) JobKeys() []string {
var jobKeys []string
jobByNodes := node.jobsMap.Map()
for _, jobs := range jobByNodes {
jobKeys = append(jobKeys, strings.Split(jobs, ",")...)
}
return jobKeys
}

// JobPayload returns the payload of the job with the given key.
// It returns:
// - (payload, true) if the job exists and has a payload
// - (nil, true) if the job exists but has no payload (empty payload)
// - (nil, false) if the job does not exist
func (node *Node) JobPayload(key string) ([]byte, bool) {
payload, ok := node.jobPayloadsMap.Get(key)
if ok {
return []byte(payload), true
}
keys := node.JobKeys()
return nil, slices.Contains(keys, key)

}

// NotifyWorker notifies the worker that handles the job with the given key.
func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error {
node.lock.Lock()
Expand Down Expand Up @@ -534,9 +587,6 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
pending, ok := node.pendingEvents[key]
if !ok {
node.logger.Error(fmt.Errorf("ackWorkerEvent: received unknown event %s from worker %s", ack.EventID, workerID))
if err := node.poolSink.Ack(ctx, pending); err != nil {
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to ack unknown event: %w", err), "event", pending.EventName, "id", pending.ID)
}
return
}

Expand Down
192 changes: 192 additions & 0 deletions pool/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,170 @@ const (
max = time.Second
)

func TestWorkers(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx := ptesting.NewTestContext(t)
rdb := ptesting.NewRedisClient(t)
node := newTestNode(t, ctx, rdb, testName)
defer ptesting.CleanupRedis(t, rdb, true, testName)

// Create a few workers
worker1 := newTestWorker(t, ctx, node)
worker2 := newTestWorker(t, ctx, node)
worker3 := newTestWorker(t, ctx, node)

// Get the list of workers
workers := node.Workers()

// Check if the number of workers is correct
assert.Equal(t, 3, len(workers), "Expected 3 workers")

// Check if all created workers are in the list
expectedWorkers := []string{worker1.ID, worker2.ID, worker3.ID}
actualWorkers := make([]string, len(workers))
for i, w := range workers {
actualWorkers[i] = w.ID
}
assert.ElementsMatch(t, expectedWorkers, actualWorkers, "The list of workers should contain all created workers")

// Shutdown node
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestPoolWorkers(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx := ptesting.NewTestContext(t)
rdb := ptesting.NewRedisClient(t)
node := newTestNode(t, ctx, rdb, testName)
defer ptesting.CleanupRedis(t, rdb, true, testName)

// Create workers on the current node
worker1 := newTestWorker(t, ctx, node)
worker2 := newTestWorker(t, ctx, node)

// Create a worker on a different node
otherNode := newTestNode(t, ctx, rdb, testName)
worker3 := newTestWorker(t, ctx, otherNode)
defer func() { assert.NoError(t, otherNode.Shutdown(ctx)) }()

// Check if the number of workers is correct (should include workers from all nodes)
assert.Eventually(t, func() bool {
return len(node.PoolWorkers()) == 3
}, max, delay, "Expected 3 workers in the pool")

// Check if all created workers are in the list
poolWorkers := node.PoolWorkers()
workerIDs := make([]string, len(poolWorkers))
for i, w := range poolWorkers {
workerIDs[i] = w.ID
}

expectedWorkerIDs := []string{worker1.ID, worker2.ID, worker3.ID}
assert.ElementsMatch(t, expectedWorkerIDs, workerIDs, "Not all expected workers were found in the pool")

// Shutdown nodes
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
}

func TestJobKeys(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx := ptesting.NewTestContext(t)
rdb := ptesting.NewRedisClient(t)
defer ptesting.CleanupRedis(t, rdb, true, testName)

node1 := newTestNode(t, ctx, rdb, testName)
node2 := newTestNode(t, ctx, rdb, testName)
newTestWorker(t, ctx, node1)
newTestWorker(t, ctx, node2)
defer func() {
assert.NoError(t, node1.Shutdown(ctx))
assert.NoError(t, node2.Shutdown(ctx))
}()

// Configure nodes to send jobs to specific workers
node1.h, node2.h = &ptesting.Hasher{Index: 0}, &ptesting.Hasher{Index: 1}

jobs := []struct {
key string
payload []byte
}{
{key: "job1", payload: []byte("payload1")},
{key: "job2", payload: []byte("payload2")},
{key: "job3", payload: []byte("payload3")},
{key: "job4", payload: []byte("payload4")},
}

for _, job := range jobs {
assert.NoError(t, node1.DispatchJob(ctx, job.key, job.payload), fmt.Sprintf("Failed to dispatch job: %s", job.key))
}

// Get job keys from the pool and check if all dispatched job keys are present
var allJobKeys []string
assert.Eventually(t, func() bool {
allJobKeys = node1.JobKeys()
return len(jobs) == len(allJobKeys)
}, max, delay, fmt.Sprintf("Number of job keys doesn't match the number of dispatched jobs: %d != %d", len(jobs), len(allJobKeys)))
for _, job := range jobs {
assert.Contains(t, allJobKeys, job.key, fmt.Sprintf("Job key %s not found in JobKeys", job.key))
}

// Dispatch a job with an existing key to node1
assert.NoError(t, node1.DispatchJob(ctx, "job1", []byte("updated payload")), "Failed to dispatch job with existing key")

// Check that the number of job keys hasn't changed
updatedAllJobKeys := node1.JobKeys()
assert.Equal(t, len(allJobKeys), len(updatedAllJobKeys), "Number of job keys shouldn't change when updating an existing job")
}

func TestJobPayload(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx := ptesting.NewTestContext(t)
rdb := ptesting.NewRedisClient(t)
defer ptesting.CleanupRedis(t, rdb, true, testName)

node := newTestNode(t, ctx, rdb, testName)
newTestWorker(t, ctx, node)
defer func() { assert.NoError(t, node.Shutdown(ctx)) }()

tests := []struct {
name string
key string
payload []byte
}{
{"job with payload", "job1", []byte("payload1")},
{"job without payload", "job2", nil},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, node.DispatchJob(ctx, tt.key, tt.payload), "Failed to dispatch job")

// Check if job payload is correct
assert.Eventually(t, func() bool {
payload, ok := node.JobPayload(tt.key)
fmt.Println(payload, ok)
fmt.Println(tt.payload)
return ok && assert.Equal(t, tt.payload, payload)
}, max, delay, fmt.Sprintf("Failed to get correct payload for job %s", tt.key))
})
}

// Test non-existent job
payload, ok := node.JobPayload("non-existent-job")
assert.False(t, ok, "Expected false for non-existent job")
assert.Nil(t, payload, "Expected nil payload for non-existent job")

// Update existing job
updatedPayload := []byte("updated payload")
assert.NoError(t, node.DispatchJob(ctx, "job1", updatedPayload), "Failed to update existing job")

// Check if the payload was updated
assert.Eventually(t, func() bool {
payload, ok := node.JobPayload("job1")
return ok && assert.Equal(t, updatedPayload, payload, "Payload was not updated correctly")
}, max, delay, "Failed to get updated payload for job")
}

func TestDispatchJobOneWorker(t *testing.T) {
testName := strings.Replace(t.Name(), "/", "_", -1)
ctx := ptesting.NewTestContext(t)
Expand Down Expand Up @@ -306,6 +470,34 @@ func TestNodeCloseAndRequeue(t *testing.T) {
require.NoError(t, node2.Shutdown(ctx), "Failed to shutdown node2")
}

func TestAckWorkerEventWithMissingPendingEvent(t *testing.T) {
// Setup
ctx := ptesting.NewTestContext(t)
testName := strings.Replace(t.Name(), "/", "_", -1)
rdb := ptesting.NewRedisClient(t)
defer ptesting.CleanupRedis(t, rdb, true, testName)
node := newTestNode(t, ctx, rdb, testName)
defer func() { assert.NoError(t, node.Shutdown(ctx)) }()

// Create a mock event with a non-existent pending event ID
mockEvent := &streaming.Event{
ID: "non-existent-event-id",
EventName: evAck,
Payload: marshalEnvelope("worker", marshalAck(&ack{EventID: "non-existent-event-id"})),
Acker: &mockAcker{
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
return redis.NewIntCmd(ctx, 0)
},
},
}

// Call ackWorkerEvent with the mock event
node.ackWorkerEvent(ctx, mockEvent)

// Verify that no panic occurred and the function completed successfully
assert.True(t, true, "ackWorkerEvent should complete without panic")
}

func TestStaleEventsAreRemoved(t *testing.T) {
// Setup
ctx := ptesting.NewTestContext(t)
Expand Down
9 changes: 8 additions & 1 deletion pool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,14 @@ func (w *Worker) Jobs() []*Job {
sort.Strings(keys)
jobs := make([]*Job, 0, len(w.jobs))
for _, key := range keys {
jobs = append(jobs, w.jobs[key])
job := w.jobs[key]
jobs = append(jobs, &Job{
Key: key,
Payload: job.Payload,
CreatedAt: job.CreatedAt,
Worker: &Worker{ID: w.ID, Node: w.Node, CreatedAt: w.CreatedAt},
NodeID: job.NodeID,
})
}
return jobs
}
Expand Down
2 changes: 1 addition & 1 deletion rmap/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (sm *Map) RemoveValues(ctx context.Context, key string, items ...string) ([
if remaining == "" {
return nil, true, nil // All items were removed, key was deleted
}
removed := result[1].(int64) == 1
removed := result[1] != nil && result[1].(int64) == 1
return strings.Split(remaining, ","), removed, nil
}

Expand Down