Skip to content

Commit

Permalink
Properly delete stale workers with no job (#37)
Browse files Browse the repository at this point in the history
* Properly delete stale workers with no job

* Only stop the proper worker

* Allow for jobs with no payloads

* Fix race condition in test

* Properly shutdown node in test
  • Loading branch information
raphael authored Oct 18, 2024
1 parent 3e2089d commit 274a813
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docker-compose/docker-compose-redis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ services:
redis:
image: "redis"
container_name: "pulse-redis"
command: redis-server --save 20 1 --loglevel warning --requirepass ${REDIS_PASSWORD}
command: redis-server --save "" --loglevel warning --requirepass ${REDIS_PASSWORD}
ports:
- "6379:6379"
48 changes: 31 additions & 17 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,23 @@ func (node *Node) handleWorkerMapUpdate(ctx context.Context) {
if node.closing {
return
}
// First cleanup the local workers that are no longer active.
for _, worker := range node.localWorkers {
if _, ok := node.workerMap.Get(worker.ID); !ok {
// If it's not in the worker map, then it's not active and its jobs
// have already been requeued.
node.logger.Info("handleWorkerMapUpdate: removing inactive local worker", "worker", worker.ID)
worker.stopAndWait(ctx)
for i, w := range node.localWorkers {
if worker.ID == w.ID {
node.localWorkers = append(node.localWorkers[:i], node.localWorkers[i+1:]...)
break
}
}
}
}

// Then rebalance the jobs across the remaining active workers.
activeWorkers := node.activeWorkers()
if len(activeWorkers) == 0 {
return
Expand Down Expand Up @@ -727,10 +744,11 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
continue
}
lastSeen := time.Unix(0, lsi)
if time.Since(lastSeen) <= node.workerTTL {
lsd := time.Since(lastSeen)
if lsd <= node.workerTTL {
continue
}
node.logger.Debug("processInactiveWorkers: removing worker", "worker", id)
node.logger.Debug("processInactiveWorkers: removing worker", "worker", id, "last-seen", lsd, "ttl", node.workerTTL)

// Use optimistic locking to set the keep-alive timestamp to a value
// in the future so that another node does not also requeue the jobs.
Expand All @@ -747,21 +765,15 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {

keys, ok := node.jobsMap.GetValues(id)
if !ok {
continue // worker is already being deleted
// Worker has no jobs, so delete it right away.
if err := node.deleteWorker(ctx, id); err != nil {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to delete worker %q: %w", id, err), "worker", id)
}
continue
}
mustRequeue := len(keys)
requeued := make(map[string]chan error)
for _, key := range keys {
payload, ok := node.jobPayloadsMap.Get(key)
if !ok {
node.logger.Error(fmt.Errorf("processInactiveWorkers: payload for job not found"), "job", key, "worker", id)
// No need to keep the job around if the payload is not found.
if _, _, err := node.jobsMap.RemoveValues(ctx, id, key); err != nil {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to remove job %q from jobs map: %w", key, err), "job", key, "worker", id)
}
mustRequeue--
continue
}
payload, _ := node.jobPayloadsMap.Get(key) // Some jobs have no payload
job := &Job{
Key: key,
Payload: []byte(payload),
Expand All @@ -776,10 +788,11 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
requeued[job.Key] = cherr
}

if len(requeued) != mustRequeue {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), mustRequeue), "worker", id)
allRequeued := len(requeued) == len(keys)
if !allRequeued {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), len(keys)), "worker", id)
}
go node.processRequeuedJobs(ctx, id, requeued, len(requeued) == mustRequeue)
go node.processRequeuedJobs(ctx, id, requeued, allRequeued)
}
}

Expand Down Expand Up @@ -866,6 +879,7 @@ func (node *Node) activeWorkers() []string {

// deleteWorker removes a worker from the pool deleting the worker stream.
func (node *Node) deleteWorker(ctx context.Context, id string) error {
node.logger.Debug("deleteWorker: deleting worker", "worker", id)
if _, err := node.keepAliveMap.Delete(ctx, id); err != nil {
node.logger.Error(fmt.Errorf("deleteWorker: failed to delete worker %q from keep-alive map: %w", id, err))
}
Expand Down
2 changes: 2 additions & 0 deletions pool/ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func TestNewTicker(t *testing.T) {
assert.WithinDuration(t, startTime.Add(tickDuration), firstTick, time.Second, "First tick should occur after approximately one tick duration")

// Verify next tick time and duration
ticker.lock.Lock()
nextTickTime, tickerDuration := deserialize(ticker.next)
ticker.lock.Unlock()
assert.WithinDuration(t, startTime.Add(tickDuration), nextTickTime, time.Second, "Next tick time should be approximately one tick duration from start")
assert.Equal(t, tickDuration, tickerDuration, "Ticker duration should match the specified duration")

Expand Down
89 changes: 86 additions & 3 deletions pool/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -32,9 +33,7 @@ func TestWorkerRequeueJobs(t *testing.T) {

// Emulate the worker failing by preventing it from refreshing its keepalive
// This means we can't cleanup cleanly, hence "false" in CleanupRedis
worker.lock.Lock()
worker.stopped = true
worker.lock.Unlock()
worker.stopAndWait(ctx)

// Create a new worker to pick up the requeued job
newWorker := newTestWorker(t, ctx, node)
Expand All @@ -50,4 +49,88 @@ func TestWorkerRequeueJobs(t *testing.T) {
require.Eventually(t, func() bool {
return len(newWorker.Jobs()) == 2
}, time.Second, delay, "job was not requeued")

// Cleanup
assert.NoError(t, node.Shutdown(ctx))
}

func TestStaleWorkerCleanupInNode(t *testing.T) {
var (
ctx = ptesting.NewTestContext(t)
testName = strings.Replace(t.Name(), "/", "_", -1)
rdb = ptesting.NewRedisClient(t)
node = newTestNode(t, ctx, rdb, testName)
)
defer ptesting.CleanupRedis(t, rdb, false, testName)

// Create one active worker
activeWorker := newTestWorker(t, ctx, node)
t.Log("active worker", activeWorker.ID)

// Create five stale workers
staleWorkers := make([]*Worker, 5)
for i := 0; i < 5; i++ {
staleWorkers[i] = newTestWorker(t, ctx, node)
staleWorkers[i].stop(ctx)
// Set the last seen time to a past time
_, err := node.keepAliveMap.Set(ctx, staleWorkers[i].ID, strconv.FormatInt(time.Now().Add(-2*node.workerTTL).UnixNano(), 10))
assert.NoError(t, err)
}

// Wait for the cleanup process to run
time.Sleep(3 * node.workerTTL)

// Check if only the active worker remains
workers := node.activeWorkers()
assert.Len(t, workers, 1, "There should be only one worker remaining")
assert.Contains(t, workers, activeWorker.ID, "The active worker should still exist")

// Cleanup
assert.NoError(t, node.Shutdown(ctx))
}

func TestStaleWorkerCleanupAcrossNodes(t *testing.T) {
var (
ctx = ptesting.NewTestContext(t)
testName = strings.Replace(t.Name(), "/", "_", -1)
rdb = ptesting.NewRedisClient(t)
node1 = newTestNode(t, ctx, rdb, testName+"_1")
node2 = newTestNode(t, ctx, rdb, testName+"_2")
)
defer ptesting.CleanupRedis(t, rdb, false, testName)

// Create one active worker on node1
activeWorker := newTestWorker(t, ctx, node1)

// Create five stale workers on node2
staleWorkers := make([]*Worker, 5)
for i := 0; i < 5; i++ {
staleWorkers[i] = newTestWorker(t, ctx, node2)
staleWorkers[i].stop(ctx)
// Set the last seen time to a past time
_, err := node2.keepAliveMap.Set(ctx, staleWorkers[i].ID, strconv.FormatInt(time.Now().Add(-2*node2.workerTTL).UnixNano(), 10))
assert.NoError(t, err)
}

// Wait for the cleanup process to run
time.Sleep(3 * node2.workerTTL)

// Check if only the active worker remains on node1
workers1 := node1.activeWorkers()
assert.Len(t, workers1, 1, "There should be only one worker remaining on node1")
assert.Contains(t, workers1, activeWorker.ID, "The active worker should still exist on node1")

// Check if all workers have been removed from node2
workers2 := node2.activeWorkers()
assert.Len(t, workers2, 0, "There should be no workers remaining on node2")

// Verify that stale workers are not in the worker map of node2
for _, worker := range staleWorkers {
_, exists := node2.workerMap.Get(worker.ID)
assert.False(t, exists, "Stale worker %s should not exist in the worker map of node2", worker.ID)
}

// Cleanup
assert.NoError(t, node1.Shutdown(ctx))
assert.NoError(t, node2.Shutdown(ctx))
}
19 changes: 19 additions & 0 deletions scripts/stop-redis
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -e

# Get the root directory of the git repository
GIT_ROOT=$(git rev-parse --show-toplevel)

# Change to the git root directory
pushd "${GIT_ROOT}"

# Source common utilities and environment variables
# shellcheck source=utils/common.sh
source ./scripts/utils/common.sh
source .env

# Stop the Redis Docker container
docker compose -p redis -f docker-compose/docker-compose-redis.yaml down

# Return to the original directory
popd
1 change: 0 additions & 1 deletion streaming/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func (s *Sink) newConsumer(ctx context.Context, stream *Stream) (string, error)
}
return "", fmt.Errorf("failed to set sink keep-alive for new consumer %s: %w", consumer, err)
}
s.logger.Debug("created new consumer", "consumer", consumer)
return consumer, nil
}

Expand Down

0 comments on commit 274a813

Please sign in to comment.