From f29a8b8160555e111c6a9b67e5222e5f49c682e1 Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Tue, 25 Jun 2024 09:25:16 +0100 Subject: [PATCH] add yugabyte adapter --- lib/que.rb | 2 + lib/que/adapters/base.rb | 1 + lib/que/adapters/yugabyte.rb | 75 ++++++++++++++++++++++++++++++++++++ lib/que/locker.rb | 2 +- spec/spec_helper.rb | 48 +++++++++++++++++++---- 5 files changed, 119 insertions(+), 9 deletions(-) create mode 100644 lib/que/adapters/yugabyte.rb diff --git a/lib/que.rb b/lib/que.rb index 08855159..400c10d8 100644 --- a/lib/que.rb +++ b/lib/que.rb @@ -61,6 +61,8 @@ def connection=(connection) self.adapter = if connection.to_s == "ActiveRecord" Adapters::ActiveRecord.new + elsif connection.to_s == "Que::Adapters::Yugabyte" + Adapters::Yugabyte.new else case connection.class.to_s when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection) diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index edf0b92d..41b3bdb7 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -9,6 +9,7 @@ module Adapters autoload :PG, "que/adapters/pg" autoload :Pond, "que/adapters/pond" autoload :Sequel, "que/adapters/sequel" + autoload :Yugabyte, "que/adapters/yugabyte" class UnavailableConnection < StandardError; end diff --git a/lib/que/adapters/yugabyte.rb b/lib/que/adapters/yugabyte.rb new file mode 100644 index 00000000..96811485 --- /dev/null +++ b/lib/que/adapters/yugabyte.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb +module Que + module Adapters + class Yugabyte < Que::Adapters::ActiveRecord + def initialize + super + end + + def checkout_activerecord_adapter(&block) + YugabyteRecord.connection_pool.with_connection(&block) + end + + # def establish_lock_database_connection + # Thread.current["lock_database_connection_#{Thread.current.__id__}"] = LockDatabaseRecord.connection + # end + + # def lock_database_connection + # # connection = @lock_database_connection[Thread.current.name] + # # return connection unless connection.nil? + # # @lock_database_connection[Thread.current.name] = LockDatabaseRecord.connection + # @lock_database_connection ||= LockDatabaseRecord.connection + # end + + def setup_lock_database_connection + ::LockDatabaseRecord.connection + end + + # def execute(command, params=[]) + # if command == :lock_job + # queue, cursor, lock_database_connection = params + # lock_job_with_lock_database(queue, cursor, lock_database_connection) + # elsif command == :unlock_job + # job_id, lock_database_connection = params + # unlock_job(job_id, lock_database_connection) + # else + # super(command, params) + # end + # end + + def lock_job_with_lock_database(queue, cursor, lock_database_connection) + query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) + .select("extract(epoch from (now() - run_at)) as latency") + .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) + .where(retryable: true) + .order(:priority, :run_at, :job_id) + .limit(1).to_sql + + result = Que.execute(query) + return result if result.empty? + + if locked?(result.first['job_id'], lock_database_connection) + return result + end + + # continue the recursion to fetch the next available job + lock_job_with_lock_database(queue, result.first['job_id'], lock_database_connection) + end + + def cleanup! + YugabyteRecord.connection_pool.release_connection + LockDatabaseRecord.connection_pool.release_connection + end + + def locked?(job_id, lock_database_connection) + lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"] + end + + def unlock_job(job_id, lock_database_connection) + lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") + end + end + end +end \ No newline at end of file diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 00df0984..62dae437 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -63,7 +63,7 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue @using_lock_database = ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false) if @using_lock_database - @lock_database_connection = LockDataBaseRecord.connection + @lock_database_connection = LockDatabaseRecord.connection end # Create a bucket that has 100% capacity, so even when we don't apply a limit we diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9570720c..265c9821 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,6 +13,7 @@ require_relative "./helpers/sleep_job" require_relative "./helpers/interruptible_sleep_job" require_relative "./helpers/user" +require_relative "../lib/que/adapters/yugabyte" def postgres_now ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"] @@ -22,20 +23,15 @@ def establish_database_connection ActiveRecord::Base.establish_connection( adapter: "postgresql", host: ENV.fetch("PGHOST", "localhost"), - user: ENV.fetch("PGUSER", "postgres"), - password: ENV.fetch("PGPASSWORD", ""), + user: ENV.fetch("PGUSER", "ubuntu"), + password: ENV.fetch("PGPASSWORD", "password"), database: ENV.fetch("PGDATABASE", "que-test"), ) end establish_database_connection -# Make sure our test database is prepared to run Que -Que.connection = ActiveRecord -Que.migrate! - - -class LockDataBaseRecord < ActiveRecord::Base +class LockDatabaseRecord < ActiveRecord::Base def self.establish_lock_database_connection establish_connection( adapter: "postgresql", @@ -51,10 +47,46 @@ def self.connection end end +class YugabyteRecord < ActiveRecord::Base + def self.establish_lock_database_connection + establish_connection( + adapter: "postgresql", + host: ENV.fetch("PGHOST", "localhost"), + user: ENV.fetch("PGUSER", "ubuntu"), + password: ENV.fetch("PGPASSWORD", "password"), + database: ENV.fetch("PGDATABASE", "que-test"), + ) + end + def self.connection + establish_lock_database_connection.connection + end +end + +# Make sure our test database is prepared to run Que +if ENV['YUGABYTE_QUE_WORKER_ENABLED'] + Que.connection = Que::Adapters::Yugabyte +else + Que.connection = ActiveRecord +end + +Que.migrate! + # Ensure we have a logger, so that we can test the code paths that log Que.logger = Logger.new("/dev/null") + RSpec.configure do |config| + # config.before(:each, :with_yugabyte_adapter) do + # Que.adapter.cleanup! + # Que.connection = Que::Adapters::Yugabyte + # end + + # config.after(:each, :with_yugabyte_adapter) do + # Que.adapter.cleanup! + # Que.connection = ActiveRecord + # end + config.filter_run_when_matching :conditional_test if ENV['YUGABYTE_QUE_WORKER_ENABLED'] + config.before do QueJob.delete_all FakeJob.log = []