diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 71628d8..e8b30b7 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -3,6 +3,7 @@ module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord + class NoLockableJobs < StandardError; end FindJobSecondsTotal = Prometheus::Client::Counter.new( :que_find_job_seconds_total, docstring: "Seconds spent finding a job", @@ -45,25 +46,25 @@ def execute(command, params = []) end def lock_job_with_lock_database(queue, cursor) - result = [] + job_to_lock = [] loop do - result = Que.transaction do - observe(nil, FindJobSecondsTotal, queue: queue) do - result = Que.execute(:find_job_to_lock, [queue, cursor]) - end + observe(duration_metric: FindJobSecondsTotal, labels: { queue: queue }) do + locked_job = Que.transaction do + job_to_lock = Que.execute(:find_job_to_lock, [queue, cursor]) - return result if result.empty? + return job_to_lock if job_to_lock.empty? - cursor = result.first["job_id"] - job_locked = pg_try_advisory_lock?(cursor) + cursor = job_to_lock.first["job_id"] + job_locked = pg_try_advisory_lock?(cursor) - observe(FindJobHitTotal, nil, { queue: queue, job_hit: job_locked }) - return result if job_locked + observe(count_metric: FindJobHitTotal, labels: { queue: queue, job_hit: job_locked }) + return job_to_lock if job_locked + end + return locked_job if locked_job end - break if result end - result + locked_job end def pg_try_advisory_lock?(job_id) @@ -85,12 +86,12 @@ def unlock_job(job_id) end end - def observe(metric, metric_duration, labels = {}) + def observe(count_metric: nil, duration_metric: nil, labels: {}) now = monotonic_now yield if block_given? ensure - metric&.increment(labels: labels) - metric_duration&.increment( + count_metric&.increment(labels: labels) + duration_metric&.increment( by: monotonic_now - now, labels: labels, ) diff --git a/lib/que/sql.rb b/lib/que/sql.rb index a6267ab..4af0dff 100644 --- a/lib/que/sql.rb +++ b/lib/que/sql.rb @@ -183,8 +183,9 @@ module Que AND run_at <= now() AND retryable = true AND job_id >= $2 - ORDER BY priority, run_at, job_id - for update skip locked LIMIT 1 + ORDER BY priority, run_at, job_id + FOR UPDATE SKIP LOCKED + LIMIT 1 }, } # rubocop:enable Style/MutableConstant diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb index e6beb61..3c10fd0 100644 --- a/spec/active_record_with_lock_spec_helper.rb +++ b/spec/active_record_with_lock_spec_helper.rb @@ -4,7 +4,7 @@ class LockDatabaseRecord < ActiveRecord::Base establish_connection( adapter: "postgresql", host: ENV.fetch("LOCK_PGHOST", "localhost"), - user: ENV.fetch("LOCK_PGUSER", "ubuntu"), + user: ENV.fetch("LOCK_PGUSER", "postgres"), password: ENV.fetch("LOCK_PGPASSWORD", "password"), database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), port: ENV.fetch("LOCK_PGPORT", 5434), @@ -28,11 +28,3 @@ def active_record_with_lock_adapter_connection lock_connection_pool: LockDatabaseRecord.connection_pool, ) end - -RSpec.configure do |config| - if ENV["ADAPTER"] == "ActiveRecordWithLock" - config.filter_run_including :active_record_with_lock - else - config.filter_run_excluding :active_record_with_lock - end -end diff --git a/spec/integration/integration_spec.rb b/spec/integration/integration_spec.rb index 37c9325..382fb94 100644 --- a/spec/integration/integration_spec.rb +++ b/spec/integration/integration_spec.rb @@ -5,24 +5,7 @@ # rubocop:disable RSpec/DescribeClass RSpec.describe "multiple workers" do - def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) - Que::WorkerGroup.start( - num, - wake_interval: 0.01, - secondary_queues: secondary_queues, - ).tap(&block).stop(stop_timeout) - end - - # Wait for a maximum of [timeout] seconds for all jobs to be worked - def wait_for_jobs_to_be_worked(timeout: 10) - start = Time.now - loop do - break if QueJob.count == 0 || Time.now - start > timeout - - sleep 0.1 - end - end - + context "with one worker and many jobs" do it "works each job exactly once" do 10.times.each { |i| FakeJob.enqueue(i) } @@ -119,20 +102,6 @@ def wait_for_jobs_to_be_worked(timeout: 10) expect(User.count).to eq(3) expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie]) end - - it "increments the metrics", :active_record_with_lock do - CreateUser.enqueue("alice") - CreateUser.enqueue("bob") - CreateUser.enqueue("charlie") - expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment). - with({ :labels => { :job_hit => false, :queue => "default" } }).at_least(:once).and_call_original - expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment). - with({ :labels => { :job_hit => true, :queue => "default" } }). - exactly(3).times.and_call_original - expect(QueJob.count).to eq(3) - - with_workers(5) { wait_for_jobs_to_be_worked } - end end context "with jobs that exceed stop timeout" do diff --git a/spec/lib/que/adapters/active_record_with_lock_spec.rb b/spec/lib/que/adapters/active_record_with_lock_spec.rb new file mode 100644 index 0000000..03e8d60 --- /dev/null +++ b/spec/lib/que/adapters/active_record_with_lock_spec.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Que::Adapters::ActiveRecordWithLock do + subject(:adapter) do + described_class.new( job_connection_pool: JobRecord.connection_pool, + lock_connection_pool: LockDatabaseRecord.connection_pool + ) + end + around do |example| + Que.adapter.tap do |old_adapter| + # We need this to avoid errors related to prepared statements + if old_adapter.class != described_class + Que.adapter = adapter + example.run + Que.adapter = old_adapter + else + example.run + end + end + end + + before do + described_class::FindJobHitTotal.values.each { |labels, _| labels.clear } + 10.times do + FakeJob.enqueue(1) + end + end + + describe ".lock_job_with_lock_database" do + it "sets metric values from queue" do + with_workers(5) { wait_for_jobs_to_be_worked } + expect(described_class::FindJobHitTotal.values[{:queue=>"default", :job_hit=>"true"}]).to eql(10.0) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1a09615..f3a5de6 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -57,3 +57,21 @@ def establish_database_connection Prometheus::Client.registry.instance_eval { @metrics.clear } end end + +def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) + Que::WorkerGroup.start( + num, + wake_interval: 0.01, + secondary_queues: secondary_queues, + ).tap(&block).stop(stop_timeout) +end + +# Wait for a maximum of [timeout] seconds for all jobs to be worked +def wait_for_jobs_to_be_worked(timeout: 10) + start = Time.now + loop do + break if QueJob.count == 0 || Time.now - start > timeout + + sleep 0.1 + end +end