Skip to content

Commit

Permalink
support Sidekiq 7+
Browse files Browse the repository at this point in the history
  • Loading branch information
dmke committed Nov 13, 2024
1 parent eec44cf commit 0186b20
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 46 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- sidekiq-7

jobs:
lint:
Expand Down Expand Up @@ -33,13 +34,14 @@ jobs:

services:
redis:
image: redis
image: ${{ matrix.redis }}
ports:
- 6379:6379

strategy:
matrix:
ruby: ["3.1", "3.2", "3.3"]
ruby: ["3.2", "3.3"]
redis: ["redis", "valkey/valkey"]

steps:
- uses: actions/checkout@v4
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# next

- Remove support for Sidekiq < 7
- Remove support for Redis gem
- Add support for RedisClient gem
- Update minimum Ruby version to 3.2

# 0.17.0 - 2024-08-06

- Add `MiniScheduler::Manager.discover_running_scheduled_jobs` API to allow running scheduled jobs to easily be discovered on the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

MiniScheduler.configure do |config|
# An instance of Redis. See https://github.com/redis/redis-rb
# An instance of RedisClient. See https://github.com/redis-rb/redis

# config.redis = $redis

Expand Down
14 changes: 7 additions & 7 deletions lib/mini_scheduler/distributed_mutex.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,26 @@ def synchronize

yield
ensure
@redis.del @key
@redis.call "del", @key
@mutex.unlock
end

private

def try_to_get_lock
got_lock = false
if @redis.setnx @key, Time.now.to_i + 60
@redis.expire @key, 60
if @redis.call "set", @key, nx: Time.now.to_i + 60
@redis.call "expire", @key, 60
got_lock = true
else
begin
@redis.watch @key
time = @redis.get @key
@redis.call "watch", @key
time = @redis.call "get", @key
if time && time.to_i < Time.now.to_i
got_lock = @redis.multi { @redis.set @key, Time.now.to_i + 60 }
got_lock = @redis.multi { @redis.call "set", @key, Time.now.to_i + 60 }
end
ensure
@redis.unwatch
@redis.call "unwatch"
end
end

Expand Down
17 changes: 9 additions & 8 deletions lib/mini_scheduler/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,14 @@ def reschedule_orphans!

def reschedule_orphans_on!(hostname = nil)
redis
.zrange(Manager.queue_key(queue, hostname), 0, -1)
.call("zrange", Manager.queue_key(queue, hostname), 0, -1)
.each do |key|
klass = get_klass(key)
next unless klass
info = schedule_info(klass)

if %w[QUEUED RUNNING].include?(info.prev_result) &&
(!info.current_owner || !redis.get(info.current_owner))
(!info.current_owner || !redis.call("get", info.current_owner))
info.prev_result = "ORPHAN"
info.next_run = Time.now.to_i
info.write!
Expand All @@ -300,8 +300,8 @@ def get_klass(name)
end

def repair_queue
if redis.exists?(self.class.queue_key(queue)) ||
redis.exists?(self.class.queue_key(queue, hostname))
if redis.call("exists", self.class.queue_key(queue)) > 0 ||
redis.call("exists", self.class.queue_key(queue, hostname)) > 0
return
end

Expand All @@ -320,14 +320,15 @@ def tick
end

def schedule_next_job(hostname = nil)
(key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
(key, due), _ =
redis.call "zrange", Manager.queue_key(queue, hostname), 0, 0, withscores: true
return unless key

if due.to_i <= Time.now.to_i
klass = get_klass(key)
if !klass || ((klass.is_per_host && !hostname) || (hostname && !klass.is_per_host))
# corrupt key, nuke it (renamed job or something)
redis.zrem Manager.queue_key(queue, hostname), key
redis.call "zrem", Manager.queue_key(queue, hostname), key
return
end

Expand Down Expand Up @@ -358,7 +359,7 @@ def keep_alive_duration

def keep_alive(*ids)
ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0
ids.each { |identity_key| redis.setex identity_key, keep_alive_duration, "" }
ids.each { |identity_key| redis.call "set", identity_key, ex: keep_alive_duration }
end

def lock
Expand Down Expand Up @@ -432,7 +433,7 @@ def self.discover_running_scheduled_jobs

schedule_keys
.keys
.zip(MiniScheduler.redis.mget(*schedule_keys.values))
.zip(MiniScheduler.redis.call("mget", *schedule_keys.values))
.each do |scheduled_job_class, scheduled_job_info|
next if scheduled_job_info.nil?

Expand Down
27 changes: 14 additions & 13 deletions lib/mini_scheduler/schedule_info.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(klass, manager)

data = nil

if data = @manager.redis.get(key)
if data = @manager.redis.call("get", key)
data = JSON.parse(data)
end

Expand Down Expand Up @@ -87,16 +87,17 @@ def schedule!

def write!
clear!
redis.set key,
{
next_run: @next_run,
prev_run: @prev_run,
prev_duration: @prev_duration,
prev_result: @prev_result,
current_owner: @current_owner,
}.to_json

redis.zadd queue_key, @next_run.to_s, @klass.to_s if @next_run
redis.call "set",
key,
{
next_run: @next_run,
prev_run: @prev_run,
prev_duration: @prev_duration,
prev_result: @prev_result,
current_owner: @current_owner,
}.to_json

redis.call "zadd", queue_key, @next_run.to_s, @klass.to_s if @next_run
end

def del!
Expand Down Expand Up @@ -127,8 +128,8 @@ def redis
private

def clear!
redis.del key
redis.zrem queue_key, @klass.to_s
redis.call "del", key
redis.call "zrem", queue_key, @klass.to_s
end
end
end
5 changes: 2 additions & 3 deletions mini_scheduler.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@ Gem::Specification.new do |spec|
spec.homepage = "https://github.com/discourse/mini_scheduler"
spec.license = "MIT"

spec.required_ruby_version = ">= 2.7.0"
spec.required_ruby_version = ">= 3.2.0"

spec.files = `git ls-files`.split($/).reject { |s| s =~ /^(spec|\.)/ }
spec.require_paths = ["lib"]

spec.add_runtime_dependency "sidekiq", ">= 4.2.3", "< 7.0"
spec.add_runtime_dependency "sidekiq", ">= 7.0", "< 8.0"

spec.add_development_dependency "pg", "~> 1.0"
spec.add_development_dependency "activesupport", "~> 7.0"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "mocha", "~> 2.0"
spec.add_development_dependency "guard", "~> 2.0"
spec.add_development_dependency "guard-rspec", "~> 4.0"
spec.add_development_dependency "redis", ">= 4.0"
spec.add_development_dependency "rake", "~> 13.0"
spec.add_development_dependency "rubocop-discourse", "= 3.8.1"
spec.add_development_dependency "syntax_tree"
Expand Down
25 changes: 15 additions & 10 deletions spec/mini_scheduler/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def perform

let(:manager) { MiniScheduler::Manager.new(enable_stats: false) }

let(:redis) { Redis.new }
let(:redis) { RedisClient.new }

before do
MiniScheduler.configure { |config| config.redis = redis }
Expand Down Expand Up @@ -136,7 +136,7 @@ def perform

wait_for(on_fail: on_thread_mismatch) { @thread_count == Thread.list.count }

redis.flushdb
redis.call "flushdb"
end

it "can disable stats" do
Expand Down Expand Up @@ -186,9 +186,9 @@ def perform

describe "#tick" do
it "should nuke missing jobs" do
redis.zadd MiniScheduler::Manager.queue_key("default"), Time.now.to_i - 1000, "BLABLA"
redis.call "zadd", MiniScheduler::Manager.queue_key("default"), Time.now.to_i - 1000, "BLABLA"
manager.tick
expect(redis.zcard(MiniScheduler::Manager.queue_key("default"))).to eq(0)
expect(redis.call("zcard", MiniScheduler::Manager.queue_key("default"))).to eq(0)
end

context "when manager is stopped" do
Expand All @@ -206,7 +206,7 @@ def perform
manager.tick
manager.stop!

redis.del manager.identity_key
redis.call "del", manager.identity_key

manager = MiniScheduler::Manager.new(enable_stats: false, workers: 0)
manager.reschedule_orphans!
Expand All @@ -229,7 +229,7 @@ def perform
wait_for { manager.schedule_info(Testing::SuperLongJob).prev_result == "RUNNING" }

# Simulate redis failure while job is running
MiniScheduler::ScheduleInfo.any_instance.stubs(:write!).raises(Redis::CommandError)
MiniScheduler::ScheduleInfo.any_instance.stubs(:write!).raises(RedisClient::CommandError)

runner = manager.instance_variable_get(:@runner)
worker_threads = runner.instance_variable_get(:@threads)
Expand All @@ -253,8 +253,8 @@ def perform

# Simulate time passing and redis keys expiring
worker_thread_ids.each do |id|
expect(manager.redis.ttl(id)).to be > 30
manager.redis.del(id)
expect(manager.redis.call("ttl", id)).to be > 30
manager.redis.call("del", id)
end

manager.reschedule_orphans!
Expand All @@ -272,7 +272,7 @@ def perform
def queued_jobs(manager, with_hostname:)
hostname = with_hostname ? manager.hostname : nil
key = MiniScheduler::Manager.queue_key(manager.queue, hostname)
redis.zrange(key, 0, -1).map(&:constantize)
redis.call("zrange", key, 0, -1).map(&:constantize)
end

it "should recover from Redis flush" do
Expand All @@ -283,7 +283,12 @@ def queued_jobs(manager, with_hostname:)
expect(queued_jobs(manager, with_hostname: false)).to include(Testing::SuperLongJob)
expect(queued_jobs(manager, with_hostname: true)).to include(Testing::PerHostJob)

redis.scan_each(match: "_scheduler_*") { |key| redis.del(key) }
cursor = 0
loop do
cursor, keys = redis.scan(cursor, match: "_scheduler_*")
keys.each { |key| redis.call("del", key) }
break if cursor == "0"
end

expect(queued_jobs(manager, with_hostname: false)).to be_empty
expect(queued_jobs(manager, with_hostname: true)).to be_empty
Expand Down
2 changes: 1 addition & 1 deletion spec/mini_scheduler/schedule_info_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

describe MiniScheduler::ScheduleInfo do
let(:manager) { MiniScheduler::Manager.new }
let(:redis) { Redis.new }
let(:redis) { RedisClient.new }

before { MiniScheduler.configure { |config| config.redis = redis } }

Expand Down
2 changes: 1 addition & 1 deletion spec/mini_scheduler/slow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def perform
end

describe MiniScheduler::Manager do
let(:redis) { Redis.new }
let(:redis) { RedisClient.new }

it "can correctly operate with multiple workers" do
MiniScheduler.configure { |config| config.redis = redis }
Expand Down

0 comments on commit 0186b20

Please sign in to comment.