From 184c0b7dd84813bc5a7374559b6ca49d05d9929f Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Mon, 2 Sep 2024 15:27:27 +0100 Subject: [PATCH] refactor and address comments --- lib/que/sql.rb | 5 +-- spec/active_record_with_lock_spec_helper.rb | 10 +----- spec/integration/integration_spec.rb | 32 ------------------- .../adapters/active_record_with_lock_spec.rb | 26 +++++++++++++++ spec/spec_helper.rb | 18 +++++++++++ 5 files changed, 48 insertions(+), 43 deletions(-) create mode 100644 spec/lib/que/adapters/active_record_with_lock_spec.rb diff --git a/lib/que/sql.rb b/lib/que/sql.rb index a6267ab..4af0dff 100644 --- a/lib/que/sql.rb +++ b/lib/que/sql.rb @@ -183,8 +183,9 @@ module Que AND run_at <= now() AND retryable = true AND job_id >= $2 - ORDER BY priority, run_at, job_id - for update skip locked LIMIT 1 + ORDER BY priority, run_at, job_id + FOR UPDATE SKIP LOCKED + LIMIT 1 }, } # rubocop:enable Style/MutableConstant diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb index e6beb61..3c10fd0 100644 --- a/spec/active_record_with_lock_spec_helper.rb +++ b/spec/active_record_with_lock_spec_helper.rb @@ -4,7 +4,7 @@ class LockDatabaseRecord < ActiveRecord::Base establish_connection( adapter: "postgresql", host: ENV.fetch("LOCK_PGHOST", "localhost"), - user: ENV.fetch("LOCK_PGUSER", "ubuntu"), + user: ENV.fetch("LOCK_PGUSER", "postgres"), password: ENV.fetch("LOCK_PGPASSWORD", "password"), database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), port: ENV.fetch("LOCK_PGPORT", 5434), @@ -28,11 +28,3 @@ def active_record_with_lock_adapter_connection lock_connection_pool: LockDatabaseRecord.connection_pool, ) end - -RSpec.configure do |config| - if ENV["ADAPTER"] == "ActiveRecordWithLock" - config.filter_run_including :active_record_with_lock - else - config.filter_run_excluding :active_record_with_lock - end -end diff --git a/spec/integration/integration_spec.rb b/spec/integration/integration_spec.rb index 37c9325..f1106d4 100644 --- a/spec/integration/integration_spec.rb +++ b/spec/integration/integration_spec.rb @@ -5,24 +5,6 @@ # rubocop:disable RSpec/DescribeClass RSpec.describe "multiple workers" do - def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) - Que::WorkerGroup.start( - num, - wake_interval: 0.01, - secondary_queues: secondary_queues, - ).tap(&block).stop(stop_timeout) - end - - # Wait for a maximum of [timeout] seconds for all jobs to be worked - def wait_for_jobs_to_be_worked(timeout: 10) - start = Time.now - loop do - break if QueJob.count == 0 || Time.now - start > timeout - - sleep 0.1 - end - end - context "with one worker and many jobs" do it "works each job exactly once" do 10.times.each { |i| FakeJob.enqueue(i) } @@ -119,20 +101,6 @@ def wait_for_jobs_to_be_worked(timeout: 10) expect(User.count).to eq(3) expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie]) end - - it "increments the metrics", :active_record_with_lock do - CreateUser.enqueue("alice") - CreateUser.enqueue("bob") - CreateUser.enqueue("charlie") - expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment). - with({ :labels => { :job_hit => false, :queue => "default" } }).at_least(:once).and_call_original - expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment). - with({ :labels => { :job_hit => true, :queue => "default" } }). - exactly(3).times.and_call_original - expect(QueJob.count).to eq(3) - - with_workers(5) { wait_for_jobs_to_be_worked } - end end context "with jobs that exceed stop timeout" do diff --git a/spec/lib/que/adapters/active_record_with_lock_spec.rb b/spec/lib/que/adapters/active_record_with_lock_spec.rb new file mode 100644 index 0000000..a150d51 --- /dev/null +++ b/spec/lib/que/adapters/active_record_with_lock_spec.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Que::Adapters::ActiveRecordWithLock do + subject(:adapter) do + described_class.new( job_connection_pool: JobRecord.connection_pool, + lock_connection_pool: LockDatabaseRecord.connection_pool + ) + end + + before do + Que.connection = adapter + 10.times do + FakeJob.enqueue(1) + end + end + + describe ".lock_job_with_lock_database" do + subject(:lock_job) { adapter.lock_job_with_lock_database("default", 0) } + it "sets metric values from queue" do + with_workers(5) { wait_for_jobs_to_be_worked } + expect(described_class::FindJobHitTotal.values[{:queue=>"default", :job_hit=>"true"}]).to eql(10.0) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1a09615..f3a5de6 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -57,3 +57,21 @@ def establish_database_connection Prometheus::Client.registry.instance_eval { @metrics.clear } end end + +def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) + Que::WorkerGroup.start( + num, + wake_interval: 0.01, + secondary_queues: secondary_queues, + ).tap(&block).stop(stop_timeout) +end + +# Wait for a maximum of [timeout] seconds for all jobs to be worked +def wait_for_jobs_to_be_worked(timeout: 10) + start = Time.now + loop do + break if QueJob.count == 0 || Time.now - start > timeout + + sleep 0.1 + end +end