From 38cf6c7927c596295b53902d145356d5b6721322 Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Tue, 25 Jun 2024 09:25:16 +0100 Subject: [PATCH] add yugabyte adapter --- lib/que/adapters/base.rb | 1 + lib/que/adapters/yugabyte.rb | 75 ++++++++++++++++++++++++++++++++++++ spec/spec_helper.rb | 48 +++++++++++++++++++---- 3 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 lib/que/adapters/yugabyte.rb diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index edf0b92..41b3bdb 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -9,6 +9,7 @@ module Adapters autoload :PG, "que/adapters/pg" autoload :Pond, "que/adapters/pond" autoload :Sequel, "que/adapters/sequel" + autoload :Yugabyte, "que/adapters/yugabyte" class UnavailableConnection < StandardError; end diff --git a/lib/que/adapters/yugabyte.rb b/lib/que/adapters/yugabyte.rb new file mode 100644 index 0000000..9681148 --- /dev/null +++ b/lib/que/adapters/yugabyte.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb +module Que + module Adapters + class Yugabyte < Que::Adapters::ActiveRecord + def initialize + super + end + + def checkout_activerecord_adapter(&block) + YugabyteRecord.connection_pool.with_connection(&block) + end + + # def establish_lock_database_connection + # Thread.current["lock_database_connection_#{Thread.current.__id__}"] = LockDatabaseRecord.connection + # end + + # def lock_database_connection + # # connection = @lock_database_connection[Thread.current.name] + # # return connection unless connection.nil? + # # @lock_database_connection[Thread.current.name] = LockDatabaseRecord.connection + # @lock_database_connection ||= LockDatabaseRecord.connection + # end + + def setup_lock_database_connection + ::LockDatabaseRecord.connection + end + + # def execute(command, params=[]) + # if command == :lock_job + # queue, cursor, lock_database_connection = params + # lock_job_with_lock_database(queue, cursor, lock_database_connection) + # elsif command == :unlock_job + # job_id, lock_database_connection = params + # unlock_job(job_id, lock_database_connection) + # else + # super(command, params) + # end + # end + + def lock_job_with_lock_database(queue, cursor, lock_database_connection) + query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) + .select("extract(epoch from (now() - run_at)) as latency") + .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) + .where(retryable: true) + .order(:priority, :run_at, :job_id) + .limit(1).to_sql + + result = Que.execute(query) + return result if result.empty? + + if locked?(result.first['job_id'], lock_database_connection) + return result + end + + # continue the recursion to fetch the next available job + lock_job_with_lock_database(queue, result.first['job_id'], lock_database_connection) + end + + def cleanup! + YugabyteRecord.connection_pool.release_connection + LockDatabaseRecord.connection_pool.release_connection + end + + def locked?(job_id, lock_database_connection) + lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"] + end + + def unlock_job(job_id, lock_database_connection) + lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") + end + end + end +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9570720..265c982 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,6 +13,7 @@ require_relative "./helpers/sleep_job" require_relative "./helpers/interruptible_sleep_job" require_relative "./helpers/user" +require_relative "../lib/que/adapters/yugabyte" def postgres_now ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"] @@ -22,20 +23,15 @@ def establish_database_connection ActiveRecord::Base.establish_connection( adapter: "postgresql", host: ENV.fetch("PGHOST", "localhost"), - user: ENV.fetch("PGUSER", "postgres"), - password: ENV.fetch("PGPASSWORD", ""), + user: ENV.fetch("PGUSER", "ubuntu"), + password: ENV.fetch("PGPASSWORD", "password"), database: ENV.fetch("PGDATABASE", "que-test"), ) end establish_database_connection -# Make sure our test database is prepared to run Que -Que.connection = ActiveRecord -Que.migrate! - - -class LockDataBaseRecord < ActiveRecord::Base +class LockDatabaseRecord < ActiveRecord::Base def self.establish_lock_database_connection establish_connection( adapter: "postgresql", @@ -51,10 +47,46 @@ def self.connection end end +class YugabyteRecord < ActiveRecord::Base + def self.establish_lock_database_connection + establish_connection( + adapter: "postgresql", + host: ENV.fetch("PGHOST", "localhost"), + user: ENV.fetch("PGUSER", "ubuntu"), + password: ENV.fetch("PGPASSWORD", "password"), + database: ENV.fetch("PGDATABASE", "que-test"), + ) + end + def self.connection + establish_lock_database_connection.connection + end +end + +# Make sure our test database is prepared to run Que +if ENV['YUGABYTE_QUE_WORKER_ENABLED'] + Que.connection = Que::Adapters::Yugabyte +else + Que.connection = ActiveRecord +end + +Que.migrate! + # Ensure we have a logger, so that we can test the code paths that log Que.logger = Logger.new("/dev/null") + RSpec.configure do |config| + # config.before(:each, :with_yugabyte_adapter) do + # Que.adapter.cleanup! + # Que.connection = Que::Adapters::Yugabyte + # end + + # config.after(:each, :with_yugabyte_adapter) do + # Que.adapter.cleanup! + # Que.connection = ActiveRecord + # end + config.filter_run_when_matching :conditional_test if ENV['YUGABYTE_QUE_WORKER_ENABLED'] + config.before do QueJob.delete_all FakeJob.log = []