Skip to content

Commit

Permalink
remove connection pool for lock connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jun 28, 2024
1 parent e302210 commit 2f55748
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 41 deletions.
2 changes: 1 addition & 1 deletion lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def connection=(connection)

Adapters::ActiveRecordWithLock.new(
job_connection_pool: connection.job_connection_pool,
lock_connection_pool: connection.lock_connection_pool
lock_connection: connection.lock_connection
)
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
Expand Down
19 changes: 4 additions & 15 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,19 @@
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
attr_accessor :job_connection_pool, :lock_connection_pool
def initialize(job_connection_pool:, lock_connection_pool:)
attr_accessor :job_connection_pool, :lock_connection
def initialize(job_connection_pool:, lock_connection:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
@lock_connection = lock_connection
super
end

def checkout_activerecord_adapter(&block)
@job_connection_pool.with_connection(&block)
end

def checkout_lock_database_connection
# when multiple threads are running we need to make sure
# the acquiring and releasing of advisory locks is done by the
# same connection
Thread.current[:db_connection] ||= lock_connection_pool.checkout
end

def lock_database_connection
Thread.current[:db_connection]
end

def release_lock_database_connection
@lock_connection_pool.checkin(Thread.current[:db_connection])
Thread.current[:db_connection] ||= @lock_connection.connection
end

def execute(command, params=[])
Expand Down
9 changes: 3 additions & 6 deletions lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ def initialize(
cursor_expiry: lock_cursor_expiry,
window: lock_window,
budget: lock_budget,
secondary_queues: secondary_queues
secondary_queues: secondary_queues,
)
end

attr_reader :metrics

def work_loop
return if @stop
Que.adapter.checkout_lock_database_connection if Que.adapter.class == Que::Adapters::ActiveRecordWithLock

@tracer.trace(RunningSecondsTotal, queue: @queue, primary_queue: @queue) do
loop do
case event = work
Expand All @@ -160,10 +160,7 @@ def work_loop
nil # immediately find a new job to work
end

if @stop
Que.adapter.release_lock_database_connection if Que.adapter.class == Que::Adapters::ActiveRecordWithLock
break
end
break if @stop
end
end
ensure
Expand Down
20 changes: 1 addition & 19 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class YugabyteRecord < ActiveRecord::Base
if ENV['YUGABYTE_QUE_WORKER_ENABLED']
Que.connection = Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: YugabyteRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool,
lock_connection: LockDatabaseRecord,
)
else
Que.connection = ActiveRecord
Expand All @@ -70,24 +70,6 @@ class YugabyteRecord < ActiveRecord::Base


RSpec.configure do |config|
# config.before(:each, :with_yugabyte_adapter) do
# Que.adapter.cleanup!
# Que.connection = Que::Adapters::Yugabyte
# end

# config.after(:each, :with_yugabyte_adapter) do
# Que.adapter.cleanup!
# Que.connection = ActiveRecord
# end
if ENV['YUGABYTE_QUE_WORKER_ENABLED']
config.before(:all) do
Que.adapter.checkout_lock_database_connection
end
config.after(:all) do
LockDatabaseRecord.connection_pool.disconnect!
end
end

config.before do
QueJob.delete_all
FakeJob.log = []
Expand Down

0 comments on commit 2f55748

Please sign in to comment.