Skip to content

Commit

Permalink
Merge pull request #18 from SberMarket-Tech/slow-periodic-job
Browse files Browse the repository at this point in the history
Remove stale periodic job from queue on requeue cycle
  • Loading branch information
hageshtrem authored Jul 10, 2023
2 parents 891e767 + e301728 commit 16f007b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
14 changes: 13 additions & 1 deletion redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,26 @@ if #res > 0 then
redis.call('zrem', KEYS[1], res[1])
queue = ARGV[1] .. j['name']
for _,v in pairs(KEYS) do
for i=3,#KEYS do
local v = KEYS[i]
if v == queue then
-- If for some reason (e.g., the service was offline) the periodic job was
-- not executed, skip the execution.
if j['d'] ~= nil and nowTs > j['d'] then
return 'ok'
end
-- If the next task in the queue has expired, discard it.
local nextTask = redis.call('rpop', queue)
if nextTask then
local decNextTask = cjson.decode(nextTask)
local deadline = decNextTask['d']
-- Return to the queue if the deadline is not set or has not expired.
if deadline == nil or nowTs < deadline then
redis.call('rpush', queue, nextTask)
end
end
j['t'] = nowTs
redis.call('lpush', queue, cjson.encode(j))
Expand Down
37 changes: 37 additions & 0 deletions requeuer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package work

import (
"context"
"testing"
"time"

"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -112,3 +114,38 @@ func TestRequeuePeriodic(t *testing.T) {
llen := listSize(pool, redisKeyJobs(ns, jobName))
assert.Equal(t, int64(0), llen)
}

func TestRequeueSlowJob(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

jobName := "test_job"
jobSpec := "*/1 * * * * *"

wp := NewWorkerPool(struct{}{}, 1, ns, pool)
defer wp.Stop()

wp.PeriodicallyEnqueue(jobSpec, jobName)

block := make(chan struct{})
runned := make(chan struct{})

wp.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, func(context.Context, *Job) error {
close(runned)
<-block
return nil
})

wp.Start()
defer func() {
close(block)
wp.Stop()
}()
<-runned

time.Sleep(time.Second * 2)

llen := listSize(pool, redisKeyJobs(ns, jobName))
assert.LessOrEqual(t, llen, int64(1))
}

0 comments on commit 16f007b

Please sign in to comment.