Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new adapter to work the job with multiple database #103

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: tests
on:
push:

jobs:
jobs:
rubocop:
runs-on: ubuntu-latest
env:
Expand All @@ -24,7 +24,6 @@ jobs:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]

runs-on: ubuntu-latest
services:
postgres:
Expand All @@ -40,7 +39,6 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 10

env:
PGDATABASE: que-test
PGUSER: ubuntu
Expand All @@ -63,7 +61,6 @@ jobs:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]

runs-on: ubuntu-latest
services:
postgres:
Expand All @@ -79,7 +76,6 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 10

env:
PGDATABASE: que-test
PGUSER: ubuntu
Expand All @@ -96,3 +92,58 @@ jobs:
- name: Run specs
run: |
bundle exec rspec

active_record_with_lock_adapter_rspec:
strategy:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14.2
env:
POSTGRES_DB: que-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 10
lock_database:
image: postgres:14.2
env:
POSTGRES_DB: lock-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
ports:
- 5434:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
PGDATABASE: que-test
PGUSER: ubuntu
PGPASSWORD: password
PGHOST: localhost
BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }}
LOCK_PGDATABASE: lock-test
LOCK_PGUSER: ubuntu
LOCK_PGPASSWORD: password
LOCK_PGHOST: localhost
ADAPTER: ActiveRecordWithLock
steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
bundler-cache: true
ruby-version: "${{ matrix.ruby-version }}"
- name: Run Specs With ActiveRecordWithLock Adapter
run: bundle exec rspec

1 change: 1 addition & 0 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def connection=(connection)
Adapters::ActiveRecord.new
else
case connection.class.to_s
when "Que::Adapters::ActiveRecordWithLock" then connection
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
when "PG::Connection" then Adapters::PG.new(connection)
Expand Down
68 changes: 68 additions & 0 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
super
end

def checkout_activerecord_adapter(&block)
checkout_lock_database_connection do
@job_connection_pool.with_connection(&block)
end
end

def checkout_lock_database_connection(&block)
@lock_connection_pool.with_connection(&block)
end

def execute(command, params = [])
case command
when :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
when :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super
end
end

def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])

break if result.empty?

cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
end
result
end

def pg_try_advisory_lock?(job_id)
checkout_lock_database_connection do |conn|
conn.execute(
"SELECT pg_try_advisory_lock(#{job_id})",
).try(:first)&.fetch("pg_try_advisory_lock")
end
end

def unlock_job(job_id)
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
checkout_lock_database_connection do |conn|
conn.execute("SELECT pg_advisory_unlock(#{job_id})")
end
ameykusurkar marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
end
1 change: 1 addition & 0 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Adapters
autoload :PG, "que/adapters/pg"
autoload :Pond, "que/adapters/pond"
autoload :Sequel, "que/adapters/sequel"
autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock"

class UnavailableConnection < StandardError; end

Expand Down
2 changes: 1 addition & 1 deletion lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
Que.execute(:unlock_job, [job[:job_id]])
end
end
end
Expand Down
24 changes: 24 additions & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,30 @@ module Que
WHERE locktype = 'advisory'
) pg USING (job_id)
},

unlock_job: %{
SELECT pg_advisory_unlock($1)
},

find_job_to_lock: %{
SELECT
queue,
priority,
run_at,
job_id,
job_class,
retryable,
args,
error_count,
extract(epoch from (now() - run_at)) as latency
FROM que_jobs
WHERE queue = $1::text
AND run_at <= now()
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
LIMIT 1
ameykusurkar marked this conversation as resolved.
Show resolved Hide resolved
},
}
# rubocop:enable Style/MutableConstant
end
30 changes: 30 additions & 0 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "postgres"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
pool: 5,
)
end

class JobRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

def active_record_with_lock_adapter_connection
Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: JobRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
end
2 changes: 1 addition & 1 deletion spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def expect_to_work(job)
with_locked_job do |actual_job|
expect(actual_job[:job_id]).to eql(job[:job_id])
expect(Que).to receive(:execute).
with("SELECT pg_advisory_unlock($1)", [job[:job_id]])
with(:unlock_job, [job[:job_id]])

# Destroy the job to simulate the behaviour of the queue, and allow our lock query
# to discover new jobs.
Expand Down
12 changes: 9 additions & 3 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require_relative "helpers/sleep_job"
require_relative "helpers/interruptible_sleep_job"
require_relative "helpers/user"
require_relative "active_record_with_lock_spec_helper"

def postgres_now
ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"]
Expand All @@ -22,16 +23,21 @@ def establish_database_connection
ActiveRecord::Base.establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "postgres"),
password: ENV.fetch("PGPASSWORD", ""),
user: ENV.fetch("PGUSER", "ubuntu"),
ameykusurkar marked this conversation as resolved.
Show resolved Hide resolved
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

establish_database_connection

# Make sure our test database is prepared to run Que
Que.connection = ActiveRecord
Que.connection =
case ENV["ADAPTER"]
when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection
else ActiveRecord
end

Que.migrate!

# Ensure we have a logger, so that we can test the code paths that log
Expand Down