Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amey/add lock database #109

Closed
wants to merge 14 commits into from
21 changes: 20 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,30 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 10
lock_database:
image: postgres:14.2
env:
POSTGRES_DB: lock-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
ports:
- 5434:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 10

env:
PGDATABASE: que-test
PGUSER: ubuntu
PGPASSWORD: password
PGHOST: localhost
BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }}
LOCK_PGDATABASE: lock-test
LOCK_PGUSER: ubuntu
LOCK_PGPASSWORD: password
LOCK_PGHOST: localhost
steps:
- uses: actions/checkout@v3
- name: Set up Ruby
Expand All @@ -41,4 +58,6 @@ jobs:
ruby-version: "${{ matrix.ruby-version }}"
- name: Run specs
run: |
bundle exec rspec
bundle exec rspec
- name: Run Specs With ActiveRecordWithLock Adapter
run: ADAPTER="ActiveRecordWithLock" bundle exec rspec
1 change: 1 addition & 0 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def connection=(connection)
Adapters::ActiveRecord.new
else
case connection.class.to_s
when "Que::Adapters::ActiveRecordWithLock" then connection
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
when "PG::Connection" then Adapters::PG.new(connection)
Expand Down
73 changes: 73 additions & 0 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
LOCK_PREFIX = ENV["QUE_LOCK_PREFIX"] || 1111 # this is a random number

def initialize(job_connection_pool:, lock_record:)
@job_connection_pool = job_connection_pool
@lock_record = lock_record
super
end

def checkout_activerecord_adapter(&block)
@lock_record.connection_pool.with_connection do
@job_connection_pool.with_connection(&block)
end
end

def lock_database_connection
@lock_record.connection
end

def execute(command, params = [])
case command
when :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
when :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super(command, params)
end
end

def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])
break if result.empty?

cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
end
result
end

def cleanup!
@job_connection_pool.release_connection
@lock_record.remove_connection
end

def pg_try_advisory_lock?(job_id)
lock_variable = "#{LOCK_PREFIX}#{job_id}".to_i
lock_database_connection.execute(
"SELECT pg_try_advisory_lock(#{lock_variable})",
).try(:first)&.fetch("pg_try_advisory_lock")
end

def unlock_job(job_id)
lock_variable = "#{LOCK_PREFIX}#{job_id}".to_i
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
lock_database_connection.execute("SELECT pg_advisory_unlock(#{lock_variable})")
end
end
end
end
1 change: 1 addition & 0 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Adapters
autoload :PG, "que/adapters/pg"
autoload :Pond, "que/adapters/pond"
autoload :Sequel, "que/adapters/sequel"
autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock"

class UnavailableConnection < StandardError; end

Expand Down
3 changes: 1 addition & 2 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue
@queue_expires_at = {}
@secondary_queues = secondary_queues
@consolidated_queues = Array.wrap(queue).concat(secondary_queues)

# Create a bucket that has 100% capacity, so even when we don't apply a limit we
# have a valid bucket that we can use everywhere
@leaky_bucket = LeakyBucket.new(window: window || 1.0, budget: budget || 1.0)
Expand Down Expand Up @@ -121,7 +120,7 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
Que.execute(:unlock_job, [job[:job_id]])
end
end
end
Expand Down
24 changes: 24 additions & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,30 @@ module Que
WHERE locktype = 'advisory'
) pg USING (job_id)
},

unlock_job: %{
SELECT pg_advisory_unlock($1)
},

find_job_to_lock: %{
SELECT
queue,
priority,
run_at,
job_id,
job_class,
retryable,
args,
error_count,
extract(epoch from (now() - run_at)) as latency
FROM que_jobs
WHERE queue = $1::text
AND run_at <= now()
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
LIMIT 1
}
}
# rubocop:enable Style/MutableConstant
end
28 changes: 28 additions & 0 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "ubuntu"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
pool: 6,
)
end

class JobRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

def active_record_with_lock_adapter_connection
Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: JobRecord.connection_pool,
lock_record: LockDatabaseRecord,
)
end
2 changes: 1 addition & 1 deletion spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def expect_to_work(job)
with_locked_job do |actual_job|
expect(actual_job[:job_id]).to eql(job[:job_id])
expect(Que).to receive(:execute).
with("SELECT pg_advisory_unlock($1)", [job[:job_id]])
with(:unlock_job, [job[:job_id]])

# Destroy the job to simulate the behaviour of the queue, and allow our lock query
# to discover new jobs.
Expand Down
14 changes: 11 additions & 3 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
require_relative "./helpers/sleep_job"
require_relative "./helpers/interruptible_sleep_job"
require_relative "./helpers/user"
require_relative "./active_record_with_lock_spec_helper"


def postgres_now
ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"]
Expand All @@ -22,21 +24,27 @@ def establish_database_connection
ActiveRecord::Base.establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "postgres"),
password: ENV.fetch("PGPASSWORD", ""),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

establish_database_connection

# Make sure our test database is prepared to run Que
Que.connection = ActiveRecord
Que.connection =
case ENV["ADAPTER"]
when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection
else ActiveRecord
end

Que.migrate!

# Ensure we have a logger, so that we can test the code paths that log
Que.logger = Logger.new("/dev/null")


RSpec.configure do |config|
config.before do
QueJob.delete_all
Expand Down
Loading