Skip to content

Commit

Permalink
FEATURE: Add MiniScheduler::Manager.discover_running_scheduled_jobs
Browse files Browse the repository at this point in the history
… API (discourse#56)

This commit adds the `MiniScheduler::Manager.discover_running_scheduled_jobs`
method which returns an array of all currently running scheduled jobs
for the current host.
  • Loading branch information
tgxworld authored Aug 8, 2024
1 parent 5aecec3 commit eec44cf
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 14 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 0.17.0 - 2024-08-06

- Add `MiniScheduler::Manager.discover_running_scheduled_jobs` API to allow running scheduled jobs to easily be discovered on the
current host.

# 0.16.0 - 2023-05-17

- Support Redis gem version 5
Expand All @@ -14,7 +19,7 @@
# 0.13.0 - 2020-11-30

- Fix exception code so it has parity with Sidekiq 4.2.3 and up, version bump cause
minimum version of Sikekiq changed.
minimum version of Sikekiq changed.

# 0.12.3 - 2020-10-15

Expand All @@ -35,7 +40,7 @@ minimum version of Sikekiq changed.
# 0.11.0 - 2019-06-24

- Correct situation where distributed mutex could end in a tight loop when
redis could not be contacted
redis could not be contacted

# 0.9.2 - 2019-04-26

Expand Down
75 changes: 69 additions & 6 deletions lib/mini_scheduler/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def worker_thread_ids

def process_queue
klass = @queue.deq

# hack alert, I need to both deq and set @running atomically.
@running = true

Expand Down Expand Up @@ -249,12 +250,7 @@ def self.current
end

def hostname
@hostname ||=
begin
`hostname`.strip
rescue StandardError
"unknown"
end
@hostname ||= self.class.hostname
end

def schedule_info(klass)
Expand Down Expand Up @@ -393,6 +389,73 @@ def self.discover_schedules
schedules
end

def self.hostname
@hostname ||=
begin
require "socket"
Socket.gethostname
rescue => e
begin
`hostname`.strip
rescue => e
"unknown_host"
end
end
end

# Discover running scheduled jobs on the current host.
#
# @example
#
# MiniScheduler::Manager.discover_running_scheduled_jobs
#
# @return [Array<Hash>] an array of hashes representing the running scheduled jobs.
# @option job [Class] :class The class of the scheduled job.
# @option job [Time] :started_at The time when the scheduled job started.
# @option job [String] :thread_id The ID of the worker thread running the job.
# The thread can be identified by matching the `:mini_scheduler_worker_thread_id` thread variable with the ID.
def self.discover_running_scheduled_jobs
hostname = self.hostname

schedule_keys =
discover_schedules.reduce({}) do |acc, klass|
acc[klass] = if klass.is_per_host
self.schedule_key(klass, hostname)
else
self.schedule_key(klass)
end

acc
end

running_scheduled_jobs = []

schedule_keys
.keys
.zip(MiniScheduler.redis.mget(*schedule_keys.values))
.each do |scheduled_job_class, scheduled_job_info|
next if scheduled_job_info.nil?

parsed =
begin
JSON.parse(scheduled_job_info, symbolize_names: true)
rescue JSON::ParserError
nil
end

next if parsed.nil?
next if parsed[:prev_result] != "RUNNING"

running_scheduled_jobs << {
class: scheduled_job_class,
started_at: Time.at(parsed[:prev_run]),
thread_id: parsed[:current_owner],
}
end

running_scheduled_jobs
end

@class_mutex = Mutex.new
def self.seq
@class_mutex.synchronize do
Expand Down
2 changes: 1 addition & 1 deletion lib/mini_scheduler/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module MiniScheduler
VERSION = "0.16.0"
VERSION = "0.17.0"
end
87 changes: 82 additions & 5 deletions spec/mini_scheduler/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ def perform
end
end

class SuperLongPerHostJob
extend ::MiniScheduler::Schedule

per_host
every 20.minutes

def perform
sleep 1000
end
end

class PerHostJob
extend ::MiniScheduler::Schedule

Expand Down Expand Up @@ -77,11 +88,16 @@ def perform
end

after do
manager.stop!
manager.remove(Testing::RandomJob)
manager.remove(Testing::SuperLongJob)
manager.remove(Testing::PerHostJob)
manager.remove(Testing::FailingJob)
ObjectSpace
.each_object(described_class)
.each do |manager|
manager.stop!
manager.remove(Testing::RandomJob)
manager.remove(Testing::SuperLongJob)
manager.remove(Testing::PerHostJob)
manager.remove(Testing::FailingJob)
manager.remove(Testing::SuperLongPerHostJob)
end

# connections that are not in use must be removed
# otherwise active record gets super confused
Expand Down Expand Up @@ -312,6 +328,67 @@ def queued_jobs(manager, with_hostname:)
end
end

describe ".discover_running_scheduled_jobs" do
let(:manager_1) { MiniScheduler::Manager.new(enable_stats: false) }
let(:manager_2) { MiniScheduler::Manager.new(enable_stats: false) }

before do
freeze_time

info = manager_1.schedule_info(Testing::SuperLongJob)
info.next_run = Time.now.to_i - 1
info.write!

manager_1.tick

info = manager_2.schedule_info(Testing::SuperLongPerHostJob)
info.next_run = Time.now.to_i - 1
info.write!

manager_2.tick

wait_for do
manager_1.schedule_info(Testing::SuperLongJob).prev_result == "RUNNING" &&
manager_2.schedule_info(Testing::SuperLongPerHostJob).prev_result == "RUNNING"
end
end

after do
manager_1.stop!
manager_2.stop!
end

it "returns running jobs on current host" do
jobs = described_class.discover_running_scheduled_jobs

expect(jobs.size).to eq(2)

super_long_job = jobs.find { |job| job[:class] == Testing::SuperLongJob }

expect(super_long_job.keys).to eq(%i[class started_at thread_id])
expect(super_long_job[:started_at]).to be_within(1).of(Time.now)
expect(super_long_job[:thread_id]).to start_with("_scheduler_#{manager.hostname}")

expect(
Thread.list.find do |thread|
thread[:mini_scheduler_worker_thread_id] == super_long_job[:thread_id]
end,
).to be_truthy

super_long_per_host_job = jobs.find { |job| job[:class] == Testing::SuperLongPerHostJob }

expect(super_long_per_host_job.keys).to eq(%i[class started_at thread_id])
expect(super_long_per_host_job[:started_at]).to be_within(1).of(Time.now)
expect(super_long_per_host_job[:thread_id]).to start_with("_scheduler_#{manager.hostname}")

expect(
Thread.list.find do |thread|
thread[:mini_scheduler_worker_thread_id] == super_long_per_host_job[:thread_id]
end,
).to be_truthy
end
end

describe "#next_run" do
it "should be within the next 5 mins if it never ran" do
manager.remove(Testing::RandomJob)
Expand Down

0 comments on commit eec44cf

Please sign in to comment.