Skip to content

Commit

Permalink
Introduce poll-interval-variance option
Browse files Browse the repository at this point in the history
  • Loading branch information
tomgi committed Sep 20, 2024
1 parent 51403c0 commit 9c27735
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 58 deletions.
29 changes: 20 additions & 9 deletions lib/que/command_line_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ def parse(
default_require_file: RAILS_ENVIRONMENT_FILE
)

options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
connection_url = nil
worker_count = nil
worker_priorities = nil
options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
poll_interval_variance = 0
connection_url = nil
worker_count = nil
worker_priorities = nil

parser =
OptionParser.new do |opts|
Expand All @@ -50,6 +51,15 @@ def parse(
poll_interval = i
end

opts.on(
'-j',
'--poll-interval-variance [INTERVAL]',
Float,
"Set maximum variance in poll interval, in seconds (default: 0)",
) do |j|
poll_interval_variance = j.to_f
end

opts.on(
'--listen [LISTEN]',
String,
Expand Down Expand Up @@ -232,7 +242,8 @@ def parse(
options[:queues] = queues_hash
end

options[:poll_interval] = poll_interval
options[:poll_interval] = poll_interval
options[:poll_interval_variance] = poll_interval_variance

locker =
begin
Expand Down
50 changes: 28 additions & 22 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class << self
}

class Locker
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :poll_interval_variance

MESSAGE_RESOLVERS = {}
RESULT_RESOLVERS = {}
Expand All @@ -47,22 +47,24 @@ class Locker
RESULT_RESOLVERS[:job_finished] =
-> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) }

DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze
DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_POLL_INTERVAL_VARIANCE = 0.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze

def initialize(
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
poll_interval_variance: DEFAULT_POLL_INTERVAL_VARIANCE,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
)

# Sanity-check all our arguments, since some users may instantiate Locker
Expand All @@ -71,6 +73,7 @@ def initialize(
Que.assert [TrueClass, FalseClass], poll

Que.assert Numeric, poll_interval
Que.assert Numeric, poll_interval_variance
Que.assert Numeric, wait_period

Que.assert Array, worker_priorities
Expand All @@ -94,20 +97,22 @@ def initialize(

Que.internal_log :locker_instantiate, self do
{
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
poll_interval_variance: poll_interval_variance,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
}
end

# Local cache of which advisory locks are held by this connection.
@locks = Set.new

@poll_interval = poll_interval
@poll_interval_variance = poll_interval_variance

if queues.is_a?(Hash)
@queue_names = queues.keys
Expand Down Expand Up @@ -207,6 +212,7 @@ def initialize(
connection: @connection,
queue: queue_name,
poll_interval: interval,
poll_interval_variance: poll_interval_variance,
)
end
end
Expand Down
50 changes: 31 additions & 19 deletions lib/que/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,32 @@ class Poller
:connection,
:queue,
:poll_interval,
:poll_interval_variance,
:last_polled_at,
:last_poll_satisfied
:last_poll_satisfied,
:next_poll_at

def initialize(
connection:,
queue:,
poll_interval:
poll_interval:,
poll_interval_variance:
)
@connection = connection
@queue = queue
@poll_interval = poll_interval
@connection = connection
@queue = queue
@poll_interval = poll_interval
@poll_interval_variance = poll_interval_variance

@last_polled_at = nil
@last_poll_satisfied = nil
@next_poll_at = Time.now

Que.internal_log :poller_instantiate, self do
{
backend_pid: connection.backend_pid,
queue: queue,
poll_interval: poll_interval,
backend_pid: connection.backend_pid,
queue: queue,
poll_interval: poll_interval,
poll_interval_variance: poll_interval_variance,
}
end
end
Expand All @@ -158,31 +165,36 @@ def poll(

@last_polled_at = Time.now
@last_poll_satisfied = poll_satisfied?(priorities, jobs)
@next_poll_at = last_polled_at +
poll_interval +
rand(-poll_interval_variance..poll_interval_variance)

Que.internal_log :poller_polled, self do
{
queue: @queue,
locked: jobs.count,
priorities: priorities,
held_locks: held_locks.to_a,
newly_locked: jobs.map { |key| key.fetch(:id) },
queue: @queue,
locked: jobs.count,
priorities: priorities,
held_locks: held_locks.to_a,
newly_locked: jobs.map { |key| key.fetch(:id) },
last_polled_at: last_polled_at,
last_poll_satisfied: last_poll_satisfied,
next_poll_at: next_poll_at,
}
end

jobs.map! { |job| Metajob.new(job) }
end

def should_poll?
# polling is disabled for this queue
return false if poll_interval.nil?

# Never polled before?
last_poll_satisfied.nil? ||
# Plenty of jobs were available last time?
last_poll_satisfied == true ||
poll_interval_elapsed?
end

def poll_interval_elapsed?
return unless interval = poll_interval
(Time.now - last_polled_at) > interval
# It's due time to poll again regardless of the last poll's results?
next_poll_at < Time.now
end

class << self
Expand Down
24 changes: 17 additions & 7 deletions spec/que/command_line_interface_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def write_file
def assert_locker_instantiated(
worker_priorities: [10, 30, 50, nil, nil, nil],
poll_interval: 5,
poll_interval_variance: 0.0,
listen: true,
wait_period: 50,
queues: ['default'],
Expand All @@ -199,13 +200,14 @@ def assert_locker_instantiated(

locker_instantiate = locker_instantiates.first

assert_equal listen, locker_instantiate[:listen]
assert_equal true, locker_instantiate[:poll]
assert_equal queues, locker_instantiate[:queues]
assert_equal poll_interval, locker_instantiate[:poll_interval]
assert_equal wait_period, locker_instantiate[:wait_period]
assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size]
assert_equal worker_priorities, locker_instantiate[:worker_priorities]
assert_equal listen, locker_instantiate[:listen]
assert_equal true, locker_instantiate[:poll]
assert_equal queues, locker_instantiate[:queues]
assert_equal poll_interval, locker_instantiate[:poll_interval]
assert_equal poll_interval_variance, locker_instantiate[:poll_interval_variance]
assert_equal wait_period, locker_instantiate[:wait_period]
assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size]
assert_equal worker_priorities, locker_instantiate[:worker_priorities]
end

def assert_locker_started(
Expand Down Expand Up @@ -258,6 +260,14 @@ def assert_locker_started(
end
end

["-j", "--poll-interval-variance"].each do |command|
it "with #{command} to configure the poll interval variance" do
assert_successful_invocation "./#{filename} #{command} 5"
assert_locker_instantiated(poll_interval_variance: 5)
assert_locker_started
end
end

it "with --listen false to disable listen mode" do
assert_successful_invocation "./#{filename} --listen false"
assert_locker_instantiated(listen: false)
Expand Down
2 changes: 1 addition & 1 deletion spec/que/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def assert_poll(priorities:, locked:)
assert_equal false, poller.should_poll?
end

it "should be false if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do
it "should be true if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do
job_ids = 5.times.map { Que::Job.enqueue.que_attrs[:id] }

result = poller.poll(priorities: { 500 => 7 }, held_locks: Set.new)
Expand Down

0 comments on commit 9c27735

Please sign in to comment.