From 8f096afb1307d645e45de9a3045925b0b663b4e3 Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Thu, 27 Jun 2024 15:18:18 +0100 Subject: [PATCH] pass the connection pool on initialise the adapter --- lib/que.rb | 8 ++++-- ...yugabyte.rb => active_record_with_lock.rb} | 26 +++++++++++-------- lib/que/adapters/base.rb | 2 +- lib/que/locker.rb | 2 +- spec/spec_helper.rb | 13 ++++------ 5 files changed, 28 insertions(+), 23 deletions(-) rename lib/que/adapters/{yugabyte.rb => active_record_with_lock.rb} (67%) diff --git a/lib/que.rb b/lib/que.rb index 400c10d..7685128 100644 --- a/lib/que.rb +++ b/lib/que.rb @@ -61,10 +61,14 @@ def connection=(connection) self.adapter = if connection.to_s == "ActiveRecord" Adapters::ActiveRecord.new - elsif connection.to_s == "Que::Adapters::Yugabyte" - Adapters::Yugabyte.new else case connection.class.to_s + when "Que::Adapters::ActiveRecordWithLock" then + + Adapters::ActiveRecordWithLock.new( + job_connection_pool: connection.job_connection_pool, + lock_connection_pool: connection.lock_connection_pool + ) when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection) when "ConnectionPool" then Adapters::ConnectionPool.new(connection) when "PG::Connection" then Adapters::PG.new(connection) diff --git a/lib/que/adapters/yugabyte.rb b/lib/que/adapters/active_record_with_lock.rb similarity index 67% rename from lib/que/adapters/yugabyte.rb rename to lib/que/adapters/active_record_with_lock.rb index b6e6787..fb5cba5 100644 --- a/lib/que/adapters/yugabyte.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -3,20 +3,23 @@ # https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb module Que module Adapters - class Yugabyte < Que::Adapters::ActiveRecord - def initialize + class ActiveRecordWithLock < Que::Adapters::ActiveRecord + attr_accessor :job_connection_pool, :lock_connection_pool + def initialize(job_connection_pool:, lock_connection_pool:) + @job_connection_pool = job_connection_pool + @lock_connection_pool = lock_connection_pool super end def checkout_activerecord_adapter(&block) - YugabyteRecord.connection_pool.with_connection(&block) + @job_connection_pool.with_connection(&block) end def checkout_lock_database_connection # when multiple threads are running we need to make sure # the acquiring and releasing of advisory locks is done by the # same connection - Thread.current[:db_connection] ||= LockDatabaseRecord.connection_pool.checkout + Thread.current[:db_connection] ||= lock_connection_pool.checkout end def lock_database_connection @@ -24,14 +27,15 @@ def lock_database_connection end def release_lock_database_connection - LockDatabaseRecord.connection_pool.checkin(Thread.current[:db_connection]) + @lock_connection_pool.checkin(Thread.current[:db_connection]) end def execute(command, params=[]) - if command == :lock_job + case command + when :lock_job then queue, cursor = params lock_job_with_lock_database(queue, cursor) - elsif command == :unlock_job + when :unlock_job then job_id = params[0] unlock_job(job_id) else @@ -43,7 +47,7 @@ def lock_job_with_lock_database(queue, cursor) result = Que.execute(:find_job_to_lock, [queue, cursor]) return result if result.empty? - if locked?(result.first['job_id']) + if pg_try_advisory_lock?(result.first['job_id']) return result end @@ -52,11 +56,11 @@ def lock_job_with_lock_database(queue, cursor) end def cleanup! - YugabyteRecord.connection_pool.release_connection - LockDatabaseRecord.connection_pool.release_connection + @job_connection_pool.release_connection + @lock_connection_pool.release_connection end - def locked?(job_id) + def pg_try_advisory_lock?(job_id) lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock') end diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index 41b3bdb..3684ebc 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -9,7 +9,7 @@ module Adapters autoload :PG, "que/adapters/pg" autoload :Pond, "que/adapters/pond" autoload :Sequel, "que/adapters/sequel" - autoload :Yugabyte, "que/adapters/yugabyte" + autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock" class UnavailableConnection < StandardError; end diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 3f7742b..27e2f90 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -153,7 +153,7 @@ def lock_job_query(queue, cursor) def handle_expired_cursors! @consolidated_queues.each do |queue| - queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) + queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index cb476a4..8c26348 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,7 +13,7 @@ require_relative "./helpers/sleep_job" require_relative "./helpers/interruptible_sleep_job" require_relative "./helpers/user" -require_relative "../lib/que/adapters/yugabyte" + def postgres_now ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"] @@ -44,7 +44,6 @@ class LockDatabaseRecord < ActiveRecord::Base end class YugabyteRecord < ActiveRecord::Base - def self.establish_lock_database_connection establish_connection( adapter: "postgresql", host: ENV.fetch("PGHOST", "localhost"), @@ -52,15 +51,14 @@ def self.establish_lock_database_connection password: ENV.fetch("PGPASSWORD", "password"), database: ENV.fetch("PGDATABASE", "que-test"), ) - end - def self.connection - establish_lock_database_connection.connection - end end # Make sure our test database is prepared to run Que if ENV['YUGABYTE_QUE_WORKER_ENABLED'] - Que.connection = Que::Adapters::Yugabyte + Que.connection = Que::Adapters::ActiveRecordWithLock.new( + job_connection_pool: YugabyteRecord.connection_pool, + lock_connection_pool: LockDatabaseRecord.connection_pool, + ) else Que.connection = ActiveRecord end @@ -91,7 +89,6 @@ def self.connection end config.before do - LockDatabaseRecord.connection_pool.release_connection QueJob.delete_all FakeJob.log = [] ExceptionalJob.log = []