Skip to content

Commit

Permalink
Merge pull request #13 from SberMarket-Tech/retry-on-error
Browse files Browse the repository at this point in the history
Add retries for committing completed jobs
  • Loading branch information
hageshtrem authored Mar 27, 2023
2 parents 7f03a5b + d45e6b4 commit cf880cc
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 40 deletions.
53 changes: 49 additions & 4 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

const (
deadTime = 10 * time.Second // 2 x heartbeat
reapPeriod = 10 * time.Minute
defaultReapPeriod = 5 * time.Minute
reapJitterSecs = 30
requeueKeysPerJob = 4
)
Expand All @@ -31,7 +31,11 @@ type deadPoolReaper struct {
doneStoppingChan chan struct{}
}

func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string) *deadPoolReaper {
func newDeadPoolReaper(namespace string, pool Pool, curJobTypes []string, reapPeriod time.Duration) *deadPoolReaper {
if reapPeriod == 0 {
reapPeriod = defaultReapPeriod
}

return &deadPoolReaper{
namespace: namespace,
pool: pool,
Expand All @@ -53,6 +57,8 @@ func (r *deadPoolReaper) stop() {
}

func (r *deadPoolReaper) loop() {
Logger.Printf("Reaper: started with a period of %v", r.reapPeriod)

// Reap immediately after we provide some time for initialization
timer := time.NewTimer(r.deadTime)
defer timer.Stop()
Expand All @@ -66,7 +72,6 @@ func (r *deadPoolReaper) loop() {
// Schedule next occurrence periodically with jitter
timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second)

// Reap
if err := r.reap(); err != nil {
logError("dead_pool_reaper.reap", err)
}
Expand All @@ -80,24 +85,35 @@ func (r *deadPoolReaper) reap() (err error) {
return err
}

Logger.Printf("Reaper: trying to acquire lock...")

acquired, err := r.acquireLock(lockValue)
if err != nil {
Logger.Printf("Reaper: acquiring lock: %v", err)
return err
}

// Another reaper is already running
if !acquired {
Logger.Printf("Reaper: locked by another process")
return nil
}

Logger.Printf("Reaper: lock is acquired")

defer func() {
err = r.releaseLock(lockValue)
}()

rErr := r.reapDeadPools()
cErr := r.clearUnknownPools()

return multierr.Combine(rErr, cErr)
// TODO: consider refactoring requeueInProgressJobs and cleanStaleLockInfo
// and removing removeDanglingLocks. There was a block where lock is 1 and
// lock_info is 0.
dErr := r.removeDanglingLocks()

return multierr.Combine(err, rErr, cErr, dErr)
}

// reapDeadPools collects the IDs of expired heartbeat pools and releases the
Expand All @@ -108,6 +124,8 @@ func (r *deadPoolReaper) reapDeadPools() error {
return err
}

Logger.Printf("Reaper: dead pools: %v", deadPoolIDs)

conn := r.pool.Get()
defer conn.Close()

Expand Down Expand Up @@ -150,6 +168,8 @@ func (r *deadPoolReaper) clearUnknownPools() error {
return err
}

Logger.Printf("Reaper: unknown pools: %v", unknownPools)

for poolID, jobTypes := range unknownPools {
if err = r.requeueInProgressJobs(poolID, jobTypes); err != nil {
return err
Expand Down Expand Up @@ -293,6 +313,31 @@ func (r *deadPoolReaper) getUnknownPools() (map[string][]string, error) {
return pools, nil
}

// removeDanglingLocks adjusts the lock keys according to the lock_info numbers.
// TODO: it's better to find where the inconsistency comes from.
func (r *deadPoolReaper) removeDanglingLocks() error {
keysCount := len(r.curJobTypes) * 2 // lock and lock_info keys
scriptArgs := make([]interface{}, 0, keysCount+1) // +1 for keys count arg
scriptArgs = append(scriptArgs, keysCount)

for _, j := range r.curJobTypes {
scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, j))
scriptArgs = append(scriptArgs, redisKeyJobsLockInfo(r.namespace, j))
}

conn := r.pool.Get()
defer conn.Close()

keys, err := redis.Strings(redisRemoveDanglingLocksScript.Do(conn, scriptArgs...))
if err != nil {
return err
}

Logger.Printf("Reaper: dangling locks: %v", keys)

return nil
}

// acquireLock acquires lock with a value and an expiration time for reap period.
func (r *deadPoolReaper) acquireLock(value string) (bool, error) {
conn := r.pool.Get()
Expand Down
77 changes: 68 additions & 9 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools)
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
assert.EqualValues(t, 3, numPools)

// Test getting dead pool ids
reaper := newDeadPoolReaper(ns, pool, []string{"type1"})
reaper := newDeadPoolReaper(ns, pool, []string{"type1"}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"1": nil, "2": nil, "3": nil}, deadPools)
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools)
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {

// setup a worker pool and start the reaper, which should restart the stale job above
wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1})
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"})
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}, 0)
wp.deadPoolReaper.deadTime = expectedDeadTime
wp.deadPoolReaper.start()

Expand Down Expand Up @@ -327,7 +327,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) {
err = conn.Flush()
assert.NoError(t, err)

reaper := newDeadPoolReaper(ns, pool, jobNames)
reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
// clean lock info for workerPoolID1
err = reaper.cleanStaleLockInfo(workerPoolID1, jobNames)
assert.NoError(t, err)
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestDeadPoolReaperTakeDeadPools(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pools
reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": nil}, deadPools)
Expand All @@ -395,7 +395,7 @@ func TestReaperLock(t *testing.T) {
ns := "work"
cleanKeyspace(ns, pool)

reaper := newDeadPoolReaper(ns, pool, []string{})
reaper := newDeadPoolReaper(ns, pool, []string{}, 0)

value, err := genValue()
assert.NoError(t, err)
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestDeadPoolReaperGetUnknownPools(t *testing.T) {
assert.NoError(t, conn.Flush())

// Run test
reaper := newDeadPoolReaper(ns, pool, jobNames)
reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
unknownPools, err := reaper.getUnknownPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, unknownPools)
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) {
assert.NoError(t, conn.Flush())

// Run test
reaper := newDeadPoolReaper(ns, pool, jobNames)
reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
err = reaper.clearUnknownPools()
assert.NoError(t, err)

Expand All @@ -551,3 +551,62 @@ func TestDeadPoolReaperClearUnknownPool(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, map[string]string{workerPoolID1: "0"}, nLockInfo2)
}

func TestDeadPoolReaperRemoveDanglingLocks(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

workerPoolID1, workerPoolID2 := "1", "2"

job1, job2, job3, job4 := "type1", "type2", "type3", "type4"
jobNames := []string{job1, job2, job3, job4}
lock1, lock2, lock3 := redisKeyJobsLock(ns, job1), redisKeyJobsLock(ns, job2), redisKeyJobsLock(ns, job3)
lockInfo1, lockInfo2 := redisKeyJobsLockInfo(ns, job1), redisKeyJobsLockInfo(ns, job2)

conn := pool.Get()
defer conn.Close()

// Create redis data
var err error

err = conn.Send("SET", lock1, 4) // One dangling lock
assert.NoError(t, err)

err = conn.Send("HMSET", lockInfo1,
workerPoolID1, 2,
workerPoolID2, 1,
)
assert.NoError(t, err)

err = conn.Send("SET", lock2, 1) // No dangling locks
assert.NoError(t, err)

err = conn.Send("HMSET", lockInfo2,
workerPoolID1, 0,
workerPoolID2, 1,
)
assert.NoError(t, err)

err = conn.Send("SET", lock3, 1) // One dangling lock
assert.NoError(t, err)

assert.NoError(t, conn.Flush())

reaper := newDeadPoolReaper(ns, pool, jobNames, 0)
err = reaper.removeDanglingLocks()
assert.NoError(t, err)

// Checks
nLock1, err := redis.Int(conn.Do("GET", lock1))
assert.NoError(t, err)
assert.Equal(t, 3, nLock1)

nLock2, err := redis.Int(conn.Do("GET", lock2))
assert.NoError(t, err)
assert.Equal(t, 1, nLock2)

nLock3, err := redis.Int(conn.Do("GET", lock3))
assert.NoError(t, err)
assert.Equal(t, 0, nLock3)
}
34 changes: 34 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,37 @@ end
return cjson.encode(unknownPools)
`)

// Used by the reaper to DECR dangling locks. Returns the dangling lock keys that
// have been fixed.
//
// KEYS[1] = job's lock key
// KEYS[2...] = job's lock info key
// Returns: ["ns:jobs:job1:lock", "ns:jobs:job3:lock"]
var redisRemoveDanglingLocksScript = redis.NewScript(-1, `
local danglingLocks = {}
for i=1,#KEYS,2 do
local lockKey = KEYS[i]
local lockInfoKey = KEYS[i+1]
local rlocks = redis.call('get', lockKey)
if rlocks ~= false then
local locks = tonumber(rlocks)
local lockInfo = redis.call('hvals', lockInfoKey)
local totalLocks = 0
for j=1,#lockInfo do
totalLocks = totalLocks + tonumber(lockInfo[j])
end
local diff = locks - totalLocks
if diff ~= 0 then
table.insert(danglingLocks, lockKey)
redis.call('decrby', lockKey, diff)
end
end
end
return danglingLocks
`)
Loading

0 comments on commit cf880cc

Please sign in to comment.