diff --git a/lib/que.rb b/lib/que.rb index 400c10d..7685128 100644 --- a/lib/que.rb +++ b/lib/que.rb @@ -61,10 +61,14 @@ def connection=(connection) self.adapter = if connection.to_s == "ActiveRecord" Adapters::ActiveRecord.new - elsif connection.to_s == "Que::Adapters::Yugabyte" - Adapters::Yugabyte.new else case connection.class.to_s + when "Que::Adapters::ActiveRecordWithLock" then + + Adapters::ActiveRecordWithLock.new( + job_connection_pool: connection.job_connection_pool, + lock_connection_pool: connection.lock_connection_pool + ) when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection) when "ConnectionPool" then Adapters::ConnectionPool.new(connection) when "PG::Connection" then Adapters::PG.new(connection) diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index 41b3bdb..3684ebc 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -9,7 +9,7 @@ module Adapters autoload :PG, "que/adapters/pg" autoload :Pond, "que/adapters/pond" autoload :Sequel, "que/adapters/sequel" - autoload :Yugabyte, "que/adapters/yugabyte" + autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock" class UnavailableConnection < StandardError; end diff --git a/lib/que/adapters/yugabyte.rb b/lib/que/adapters/yugabyte.rb deleted file mode 100644 index b6e6787..0000000 --- a/lib/que/adapters/yugabyte.rb +++ /dev/null @@ -1,68 +0,0 @@ -# frozen_string_literal: true - -# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb -module Que - module Adapters - class Yugabyte < Que::Adapters::ActiveRecord - def initialize - super - end - - def checkout_activerecord_adapter(&block) - YugabyteRecord.connection_pool.with_connection(&block) - end - - def checkout_lock_database_connection - # when multiple threads are running we need to make sure - # the acquiring and releasing of advisory locks is done by the - # same connection - Thread.current[:db_connection] ||= LockDatabaseRecord.connection_pool.checkout - end - - def lock_database_connection - Thread.current[:db_connection] - end - - def release_lock_database_connection - LockDatabaseRecord.connection_pool.checkin(Thread.current[:db_connection]) - end - - def execute(command, params=[]) - if command == :lock_job - queue, cursor = params - lock_job_with_lock_database(queue, cursor) - elsif command == :unlock_job - job_id = params[0] - unlock_job(job_id) - else - super(command, params) - end - end - - def lock_job_with_lock_database(queue, cursor) - result = Que.execute(:find_job_to_lock, [queue, cursor]) - return result if result.empty? - - if locked?(result.first['job_id']) - return result - end - - # continue the recursion to fetch the next available job - lock_job_with_lock_database(queue, result.first['job_id']) - end - - def cleanup! - YugabyteRecord.connection_pool.release_connection - LockDatabaseRecord.connection_pool.release_connection - end - - def locked?(job_id) - lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock') - end - - def unlock_job(job_id) - lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") - end - end - end -end \ No newline at end of file diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 3f7742b..27e2f90 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -153,7 +153,7 @@ def lock_job_query(queue, cursor) def handle_expired_cursors! @consolidated_queues.each do |queue| - queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) + queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index cb476a4..8c26348 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,7 +13,7 @@ require_relative "./helpers/sleep_job" require_relative "./helpers/interruptible_sleep_job" require_relative "./helpers/user" -require_relative "../lib/que/adapters/yugabyte" + def postgres_now ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"] @@ -44,7 +44,6 @@ class LockDatabaseRecord < ActiveRecord::Base end class YugabyteRecord < ActiveRecord::Base - def self.establish_lock_database_connection establish_connection( adapter: "postgresql", host: ENV.fetch("PGHOST", "localhost"), @@ -52,15 +51,14 @@ def self.establish_lock_database_connection password: ENV.fetch("PGPASSWORD", "password"), database: ENV.fetch("PGDATABASE", "que-test"), ) - end - def self.connection - establish_lock_database_connection.connection - end end # Make sure our test database is prepared to run Que if ENV['YUGABYTE_QUE_WORKER_ENABLED'] - Que.connection = Que::Adapters::Yugabyte + Que.connection = Que::Adapters::ActiveRecordWithLock.new( + job_connection_pool: YugabyteRecord.connection_pool, + lock_connection_pool: LockDatabaseRecord.connection_pool, + ) else Que.connection = ActiveRecord end @@ -91,7 +89,6 @@ def self.connection end config.before do - LockDatabaseRecord.connection_pool.release_connection QueJob.delete_all FakeJob.log = [] ExceptionalJob.log = []