diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 7dbd41b..f23ad21 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -3,18 +3,19 @@ module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord - class NoLockableJobs < StandardError; end - FindJobSecondsTotal = Prometheus::Client::Counter.new( - :que_find_job_seconds_total, - docstring: "Seconds spent finding a job", - labels: %i[queue], - ) + METRICS = [ + FindJobSecondsTotal = Prometheus::Client::Counter.new( + :que_find_job_seconds_total, + docstring: "Seconds spent finding a job", + labels: %i[queue], + ), - FindJobHitTotal = Prometheus::Client::Counter.new( - :que_find_job_total, - docstring: "total number of job hit and misses when acquiring a lock", - labels: %i[queue job_hit], - ) + FindJobHitTotal = Prometheus::Client::Counter.new( + :que_find_job_hit_total, + docstring: "total number of job hit and misses when acquiring a lock", + labels: %i[queue job_hit], + ), + ].freeze def initialize(job_connection_pool:, lock_connection_pool:) @job_connection_pool = job_connection_pool @@ -50,9 +51,8 @@ def execute(command, params = []) def lock_job_with_lock_database(queue, cursor) loop do observe(duration_metric: FindJobSecondsTotal, labels: { queue: queue }) do - locked_job = Que.transaction do + Que.transaction do job_to_lock = Que.execute(:find_job_to_lock, [queue, cursor]) - return job_to_lock if job_to_lock.empty? cursor = job_to_lock.first["job_id"] @@ -61,7 +61,6 @@ def lock_job_with_lock_database(queue, cursor) observe(count_metric: FindJobHitTotal, labels: { queue: queue, job_hit: job_locked }) return job_to_lock if job_locked end - return locked_job if locked_job end end end diff --git a/lib/que/middleware/worker_collector.rb b/lib/que/middleware/worker_collector.rb index 9d498b9..c90fb70 100644 --- a/lib/que/middleware/worker_collector.rb +++ b/lib/que/middleware/worker_collector.rb @@ -18,6 +18,7 @@ def initialize(app, options = {}) register(*WorkerGroup::METRICS) register(*Worker::METRICS) register(*Locker::METRICS) + register(*Adapters::ActiveRecordWithLock::METRICS) end def call(env) diff --git a/lib/que/worker.rb b/lib/que/worker.rb index 2f644ec..0007813 100644 --- a/lib/que/worker.rb +++ b/lib/que/worker.rb @@ -170,6 +170,7 @@ def work_loop def work Que.adapter.checkout do @locker.with_locked_job do |job| + return :job_not_found if job.nil? log_keys = { diff --git a/spec/lib/que/adapters/active_record_with_lock_spec.rb b/spec/lib/que/adapters/active_record_with_lock_spec.rb index 73607e9..cb37b5c 100644 --- a/spec/lib/que/adapters/active_record_with_lock_spec.rb +++ b/spec/lib/que/adapters/active_record_with_lock_spec.rb @@ -2,25 +2,12 @@ require "spec_helper" -RSpec.describe Que::Adapters::ActiveRecordWithLock do +RSpec.describe Que::Adapters::ActiveRecordWithLock, :active_record_with_lock do subject(:adapter) do described_class.new(job_connection_pool: JobRecord.connection_pool, lock_connection_pool: LockDatabaseRecord.connection_pool) end - around do |example| - Que.adapter.tap do |old_adapter| - # We need this to avoid errors related to prepared statements - if old_adapter.class != described_class - Que.adapter = adapter - example.run - Que.adapter = old_adapter - else - example.run - end - end - end - before do described_class::FindJobHitTotal.values.each { |labels, _| labels.clear } end @@ -33,7 +20,9 @@ end it "sets correct metric values" do + expect(QueJob.count).to eq(10) with_workers(5) { wait_for_jobs_to_be_worked } + expect(QueJob.count).to eq(0) expect(described_class::FindJobHitTotal.values[{ :queue => "default", :job_hit => "true" }]).to eq(10.0) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index f3a5de6..8baa994 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -44,6 +44,19 @@ def establish_database_connection Que.logger = Logger.new("/dev/null") RSpec.configure do |config| + # Run only specific adapter files based on the adapter class + spec_dir = "./spec/lib" + # Construct the path for the adapter spec file + adapter_spec_class_path = File.join(spec_dir, "#{Que.adapter.class.to_s.underscore}_spec.rb") + + # Exclude patterns for tests in the que/adapters directory + config.exclude_pattern = "**/que/adapters/*.rb" + + # Require the adapter spec file if it exists + if File.exist?(adapter_spec_class_path) + require adapter_spec_class_path + end + config.before do QueJob.delete_all FakeJob.log = []