diff --git a/lib/que.rb b/lib/que.rb index 7685128..6768d52 100644 --- a/lib/que.rb +++ b/lib/que.rb @@ -67,7 +67,7 @@ def connection=(connection) Adapters::ActiveRecordWithLock.new( job_connection_pool: connection.job_connection_pool, - lock_connection_pool: connection.lock_connection_pool + lock_connection: connection.lock_connection ) when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection) when "ConnectionPool" then Adapters::ConnectionPool.new(connection) diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 59228eb..f10b38d 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -4,10 +4,10 @@ module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord - attr_accessor :job_connection_pool, :lock_connection_pool - def initialize(job_connection_pool:, lock_connection_pool:) + attr_accessor :job_connection_pool, :lock_connection + def initialize(job_connection_pool:, lock_connection:) @job_connection_pool = job_connection_pool - @lock_connection_pool = lock_connection_pool + @lock_connection = lock_connection super end @@ -15,19 +15,8 @@ def checkout_activerecord_adapter(&block) @job_connection_pool.with_connection(&block) end - def checkout_lock_database_connection - # when multiple threads are running we need to make sure - # the acquiring and releasing of advisory locks is done by the - # same connection - Thread.current[:db_connection] ||= lock_connection_pool.checkout - end - def lock_database_connection - Thread.current[:db_connection] - end - - def release_lock_database_connection - @lock_connection_pool.checkin(Thread.current[:db_connection]) + Thread.current[:db_connection] ||= @lock_connection.connection end def execute(command, params=[]) diff --git a/lib/que/worker.rb b/lib/que/worker.rb index c79f81e..a511e0e 100644 --- a/lib/que/worker.rb +++ b/lib/que/worker.rb @@ -134,7 +134,7 @@ def initialize( cursor_expiry: lock_cursor_expiry, window: lock_window, budget: lock_budget, - secondary_queues: secondary_queues + secondary_queues: secondary_queues, ) end @@ -142,7 +142,7 @@ def initialize( def work_loop return if @stop - Que.adapter.checkout_lock_database_connection if Que.adapter.class == Que::Adapters::ActiveRecordWithLock + @tracer.trace(RunningSecondsTotal, queue: @queue, primary_queue: @queue) do loop do case event = work @@ -160,10 +160,7 @@ def work_loop nil # immediately find a new job to work end - if @stop - Que.adapter.release_lock_database_connection if Que.adapter.class == Que::Adapters::ActiveRecordWithLock - break - end + break if @stop end end ensure diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8c26348..08167b1 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -57,7 +57,7 @@ class YugabyteRecord < ActiveRecord::Base if ENV['YUGABYTE_QUE_WORKER_ENABLED'] Que.connection = Que::Adapters::ActiveRecordWithLock.new( job_connection_pool: YugabyteRecord.connection_pool, - lock_connection_pool: LockDatabaseRecord.connection_pool, + lock_connection: LockDatabaseRecord, ) else Que.connection = ActiveRecord @@ -70,24 +70,6 @@ class YugabyteRecord < ActiveRecord::Base RSpec.configure do |config| - # config.before(:each, :with_yugabyte_adapter) do - # Que.adapter.cleanup! - # Que.connection = Que::Adapters::Yugabyte - # end - - # config.after(:each, :with_yugabyte_adapter) do - # Que.adapter.cleanup! - # Que.connection = ActiveRecord - # end - if ENV['YUGABYTE_QUE_WORKER_ENABLED'] - config.before(:all) do - Que.adapter.checkout_lock_database_connection - end - config.after(:all) do - LockDatabaseRecord.connection_pool.disconnect! - end - end - config.before do QueJob.delete_all FakeJob.log = []