Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jul 4, 2024
1 parent 1569cba commit 30ee504
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 45 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ jobs:
- name: Run specs
run: |
bundle exec rspec
- name: Run Specs With YUAGBUTE_QUE_WORKER_ENABLED Enabled
run: YUAGBUTE_QUE_WORKER_ENABLED=true bundle exec rspec
- name: Run Specs With ActiveRecordWithLock Adapter
run: ADAPTER="ActiveRecordWithLock" bundle exec rspec
7 changes: 1 addition & 6 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@ def connection=(connection)
Adapters::ActiveRecord.new
else
case connection.class.to_s
when "Que::Adapters::ActiveRecordWithLock" then

Adapters::ActiveRecordWithLock.new(
job_connection_pool: connection.job_connection_pool,
lock_connection: connection.lock_connection
)
when "Que::Adapters::ActiveRecordWithLock" then connection
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
when "PG::Connection" then Adapters::PG.new(connection)
Expand Down
14 changes: 9 additions & 5 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
attr_accessor :job_connection_pool, :lock_connection
def initialize(job_connection_pool:, lock_connection:)
attr_accessor :job_connection_pool, :lock_record
def initialize(job_connection_pool:, lock_record:)
@job_connection_pool = job_connection_pool
@lock_connection = lock_connection
@lock_record = lock_record
super
end

Expand All @@ -16,7 +16,11 @@ def checkout_activerecord_adapter(&block)
end

def lock_database_connection
Thread.current[:db_connection] ||= @lock_connection.connection
# We are storing this in thread variable here to make sure
# same connection is used to acquire and release the advisory locks.
# Advisory lock will not be released if any other connection from the
# pool tries to release the lock
Thread.current[:db_connection] ||= @lock_record.connection
end

def execute(command, params=[])
Expand Down Expand Up @@ -47,7 +51,7 @@ def lock_job_with_lock_database(queue, cursor)

def cleanup!
@job_connection_pool.release_connection
@lock_connection.release_connection
@lock_record.release_connection
end

def pg_try_advisory_lock?(job_id)
Expand Down
28 changes: 28 additions & 0 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "ubuntu"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5435),
pool: 6,
)
end

class JobRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

def active_record_with_lock_adapter_connection
Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: JobRecord.connection_pool,
lock_record: LockDatabaseRecord,
)
end
1 change: 0 additions & 1 deletion spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def expect_to_lock_with(cursor:)
expect_to_work(job_2)

@epoch += (cursor_expiry) # our cursor should now expire
# puts @epoch
expect_to_lock_with(cursor: 0)
expect_to_work(job_3)
end
Expand Down
1 change: 0 additions & 1 deletion spec/lib/que/middleware/queue_collector_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
it "does not include old results that are no longer relevant" do
# Populate metrics, check we have some counts
collector.call({})
puts QueJob.all
QueJob.delete_all

collector.call({})
Expand Down
36 changes: 6 additions & 30 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require_relative "./helpers/sleep_job"
require_relative "./helpers/interruptible_sleep_job"
require_relative "./helpers/user"
require_relative "./active_record_with_lock_spec_helper"


def postgres_now
Expand All @@ -31,37 +32,12 @@ def establish_database_connection

establish_database_connection


class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "ubuntu"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5435),
pool: 6,)
end

class YugabyteRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

# Make sure our test database is prepared to run Que
if ENV['YUGABYTE_QUE_WORKER_ENABLED']
Que.connection = Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: YugabyteRecord.connection_pool,
lock_connection: LockDatabaseRecord,
)
else
Que.connection = ActiveRecord
end
Que.connection =
case ENV["ADAPTER"]
when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection
else ActiveRecord
end

Que.migrate!

Expand Down

0 comments on commit 30ee504

Please sign in to comment.