diff --git a/.circleci/config.yml b/.circleci/config.yml index ba26bc4..4e23a45 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -34,11 +34,19 @@ jobs: PGUSER: ubuntu PGPASSWORD: password PGHOST: localhost + LOCK_PGDATABASE: lock-test + LOCK_PGUSER: ubuntu + LOCK_PGPASSWORD: password - image: postgres:11.2 environment: POSTGRES_DB: que-test POSTGRES_USER: ubuntu POSTGRES_PASSWORD: password + - image: postgres:11.2 + environment: + POSTGRES_DB: lock-test + POSTGRES_USER: ubuntu + POSTGRES_PASSWORD: password steps: - add_ssh_keys - checkout diff --git a/bin/que b/bin/que index dab3032..7766431 100755 --- a/bin/que +++ b/bin/que @@ -67,6 +67,10 @@ OptionParser.new do |opts| $stdout.puts opts exit 0 end + + opts.on("--using-lock-database [USING_LOCK_DATABSE]", FalseClass, "sets if we want to use seperate database for locking") do |using_lock_database| + options.using_lock_database = using_lock_database + end end.parse!(ARGV) # rubocop:enable Metrics/LineLength @@ -93,6 +97,7 @@ cursor_expiry = options.cursor_expiry || wake_interval worker_count = options.worker_count || 1 timeout = options.timeout secondary_queues = options.secondary_queues || [] +using_lock_database = options.using_lock_database || false Que.logger ||= Logger.new(STDOUT) @@ -118,6 +123,7 @@ worker_group = Que::WorkerGroup.start( lock_window: options.lock_window, lock_budget: options.lock_budget, secondary_queues: secondary_queues, + using_lock_database: using_lock_database, ) if options.metrics_port diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 43f3cc3..c3b23b9 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -53,13 +53,18 @@ class Locker ), ].freeze - def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queues: []) + def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queues: [], using_lock_database: false) @queue = queue @cursor_expiry = cursor_expiry @queue_cursors = {} @queue_expires_at = {} @secondary_queues = secondary_queues @consolidated_queues = Array.wrap(queue).concat(secondary_queues) + @using_lock_database = using_lock_database + + if @using_lock_database + @lock_database_connection = LockDataBaseRecord.connection + end # Create a bucket that has 100% capacity, so even when we don't apply a limit we # have a valid bucket that we can use everywhere @@ -121,7 +126,11 @@ def with_locked_job ensure if job observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do - Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]]) + if @using_lock_database + @lock_database_connection.execute("SELECT pg_advisory_unlock(#{job["job_id"]})") + else + Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]]) + end end end end @@ -149,7 +158,31 @@ def exists?(job) end def lock_job_query(queue, cursor) - Que.execute(:lock_job, [queue, cursor]).first + if @using_lock_database + lock_job_with_lock_database(queue, cursor) + else + Que.execute(:lock_job, [queue, cursor]).first + end + end + + def lock_job_with_lock_database(queue, cursor) + query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) + .select("extract(epoch from (now() - run_at)) as latency") + .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) + .where(retryable: true) + .order(:priority, :run_at, :job_id) + .limit(1).to_sql + result = Que.execute(query).first + return result if result.nil? + + return result if locked?(result['job_id']) + + # continue the recursion to fetch the next available job + lock_job_with_lock_database(queue, result['job_id']) + end + + def locked?(job_id) + @lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"] end def handle_expired_cursors! diff --git a/lib/que/worker.rb b/lib/que/worker.rb index a511e0e..d90dd0c 100644 --- a/lib/que/worker.rb +++ b/lib/que/worker.rb @@ -121,7 +121,8 @@ def initialize( lock_cursor_expiry: DEFAULT_WAKE_INTERVAL, lock_window: nil, lock_budget: nil, - secondary_queues: [] + secondary_queues: [], + using_lock_database: false ) @queue = queue @wake_interval = wake_interval @@ -135,6 +136,7 @@ def initialize( window: lock_window, budget: lock_budget, secondary_queues: secondary_queues, + using_lock_database: using_lock_database, ) end diff --git a/spec/integration/integration_with_lock_database_spec.rb b/spec/integration/integration_with_lock_database_spec.rb new file mode 100644 index 0000000..57cd950 --- /dev/null +++ b/spec/integration/integration_with_lock_database_spec.rb @@ -0,0 +1,154 @@ +# frozen_string_literal: true + +require "spec_helper" +require "que/worker" # required to prevent autoload races + +RSpec.describe "multiple workers" do + def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) + Que::WorkerGroup.start( + num, + wake_interval: 0.01, + secondary_queues: secondary_queues, + using_lock_database: true, + ).tap(&block).stop(stop_timeout) + end + + # Wait for a maximum of [timeout] seconds for all jobs to be worked + def wait_for_jobs_to_be_worked(timeout: 10) + start = Time.now + loop do + break if QueJob.count == 0 || Time.now - start > timeout + + sleep 0.1 + end + end + + context "with one worker and many jobs" do + it "works each job exactly once" do + QueJob.delete_all + 10.times.each { |i| FakeJob.enqueue(i) } + + expect(QueJob.count).to eq(10) + + with_workers(1) { wait_for_jobs_to_be_worked } + + expect(QueJob.count).to eq(0) + expect(FakeJob.log.count).to eq(10) + end + end + + context "with a job on a non default queue" do + context "with exclusive workers" do + it "does not work the job on the non-default queue" do + FakeJob.enqueue(1, queue: "default") + FakeJob.enqueue(2, queue: "non-default") + + expect(QueJob.count).to eq(2) + + with_workers(1) { wait_for_jobs_to_be_worked(timeout: 1) } + + expect(QueJob.count).to eq(1) + expect(FakeJob.log.count).to eq(1) + end + end + + context "with permissive workers" do + it "works each job exactly once" do + FakeJob.enqueue(1, queue: "default") + FakeJob.enqueue(2, queue: "non-default") + + expect(QueJob.count).to eq(2) + + with_workers(1, secondary_queues: ["non-default"]) do + wait_for_jobs_to_be_worked(timeout: 1) + end + + expect(QueJob.count).to eq(0) + expect(FakeJob.log.count).to eq(2) + end + + it "works jobs for defined secondary_queues only" do + FakeJob.enqueue(1, queue: "default") + FakeJob.enqueue(2, queue: "non-default") + FakeJob.enqueue(3, queue: "not-worked") + + expect(QueJob.count).to eq(3) + + with_workers(1, secondary_queues: ["non-default"]) do + wait_for_jobs_to_be_worked(timeout: 1) + end + + expect(QueJob.count).to eq(1) + expect(FakeJob.log.count).to eq(2) + end + end + end + + context "with multiple workers contending over the same job" do + it "works that job exactly once" do + FakeJob.enqueue(1) + + expect(QueJob.count).to eq(1) + + with_workers(5) { wait_for_jobs_to_be_worked } + + expect(QueJob.count).to eq(0) + expect(FakeJob.log.count).to eq(1) + end + end + + context "with multiple jobs" do + around do |example| + ActiveRecord::Base.connection.execute( + "CREATE TABLE IF NOT EXISTS users ( name text )", + ) + User.delete_all + example.run + ActiveRecord::Base.connection.execute("DROP TABLE users") + end + + it "works them all exactly once" do + CreateUser.enqueue("alice") + CreateUser.enqueue("bob") + CreateUser.enqueue("charlie") + + expect(QueJob.count).to eq(3) + + with_workers(5) { wait_for_jobs_to_be_worked } + + expect(QueJob.count).to eq(0) + expect(User.count).to eq(3) + expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie]) + end + end + + context "with jobs that exceed stop timeout" do + it "raises Que::JobTimeoutError" do + SleepJob.enqueue(5) # sleep 5s + + # Sleep to let the worker pick-up the SleepJob, then stop the worker with an + # aggressive timeout. This should cause JobTimeout to be raised in the worker + # thread. + with_workers(1, stop_timeout: 0.01) { sleep 0.1 } + + sleep_job = QueJob.last + + expect(sleep_job).to_not be_nil + expect(sleep_job.last_error).to match(/Job exceeded timeout when requested to stop/) + end + + context "but is interruptible" do + it "terminates gracefully" do + # Sleep for 0.2s before checking if it should continue + InterruptibleSleepJob.enqueue(0.2) + + # Sleep 0.1s to let the worker pick-up the SleepJob, then stop the worker with a + # a long enough timeout to let an iteration of sleep complete. + with_workers(1, stop_timeout: 0.3) { sleep 0.1 } + + expect(QueJob.count).to eq(0) + expect(InterruptibleSleepJob.log.count).to eq(1) + end + end + end +end diff --git a/spec/lib/que/locker_spec.rb b/spec/lib/que/locker_spec.rb index 08cc4a7..5b4066d 100644 --- a/spec/lib/que/locker_spec.rb +++ b/spec/lib/que/locker_spec.rb @@ -12,6 +12,7 @@ let(:queue) { "default" } let(:cursor_expiry) { 0 } + let(:cursor) { 0 } describe ".with_locked_job" do before { allow(Que).to receive(:execute).and_call_original } @@ -44,12 +45,12 @@ def expect_to_work(job) # Our tests are very concerned with which cursor we use and when def expect_to_lock_with(cursor:) - expect(Que).to receive(:execute).with(:lock_job, [queue, cursor]) + expect(Que).to receive(:execute).with(:lock_job, [queue, cursor]).and_call_original end context "with no jobs to lock" do it "scans entire table and calls block with nil job" do - expect(Que).to receive(:execute).with(:lock_job, [queue, 0]) + expect(Que).to receive(:execute).with(:lock_job, [queue, 0]).and_call_original with_locked_job do |job| expect(job).to be_nil @@ -121,4 +122,145 @@ def expect_to_lock_with(cursor:) # rubocop:enable RSpec/InstanceVariable end end + + context "when using lock database" do + subject(:locker) do + described_class.new( + queue: queue, + cursor_expiry: cursor_expiry, + using_lock_database: true, + ) + end + + let(:sql) do + QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) + .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) + .where(retryable: true) + .order(:priority, :run_at, :job_id) + .limit(1).to_sql + end + before do + allow(Time).to receive(:now).and_return(Time.now + 1) + end + + describe ".with_locked_job" do + before { allow(Que).to receive(:execute).and_call_original } + + # Helper to call the with_locked_job method but ensure our block has actually been + # called. Without this, it's possible that we'd never run expectations in our block. + def with_locked_job + block_called = false + locker.with_locked_job do |job| + yield(job) + block_called = true + end + + raise "did not call job block" unless block_called + end + + # Simulates actual working of a job, which is useful to these tests to free up another + # job for locking. + def expect_to_work(job) + with_locked_job do |actual_job| + expect(actual_job[:job_id]).to eql(job[:job_id]) + expect(locker.instance_variable_get(:@lock_database_connection)).to receive(:execute). + with("SELECT pg_advisory_unlock(#{job[:job_id]})") + + # Destroy the job to simulate the behaviour of the queue, and allow our lock query + # to discover new jobs. + QueJob.find(job[:job_id]).destroy! + end + end + def query(cursor = 0) + QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) + .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) + .where(retryable: true) + .order(:priority, :run_at, :job_id) + .limit(1).to_sql + end + # Our tests are very concerned with which cursor we use and when + def expect_to_lock_with(cursor:, locked_job: nil) + expect(Que).to receive(:execute).with(query(cursor)).and_call_original + + if locked_job + expect(locker.instance_variable_get(:@lock_database_connection)).to receive(:execute). + with("SELECT pg_try_advisory_lock(#{locked_job[:job_id]})").and_call_original + end + end + + context "with no jobs to lock" do + it "scans entire table and calls block with nil job" do + expect(Que).to receive(:execute).with(query).and_call_original + + with_locked_job do |job| + expect(job).to be_nil + end + end + end + + context "with just one job to lock" do + let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs } + let(:cursor_expiry) { 60 } + + # Pretend time isn't moving, as we don't want to test cursor expiry here + before { allow(Process).to receive(:clock_gettime).and_return(0) } + + # We want our workers to start from the front of the queue immediately after finding + # no jobs are available to work. + it "will use a cursor until no jobs are found" do + expect_to_lock_with(cursor: 0, locked_job: job_1) + expect_to_work(job_1) + + expect_to_lock_with(cursor: job_1[:job_id]) + with_locked_job {} + + expect_to_lock_with(cursor: 0) + with_locked_job {} + end + end + + context "with jobs to lock" do + let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs } + let!(:job_2) { FakeJob.enqueue(2, queue: queue, priority: 2).attrs } + let!(:job_3) { FakeJob.enqueue(3, queue: queue, priority: 3).attrs } + + it "locks and then unlocks the most important job" do + expect_to_lock_with(cursor: 0, locked_job: job_1) + expect_to_work(job_1) + end + + # rubocop:disable RSpec/SubjectStub + # rubocop:disable RSpec/InstanceVariable + context "on subsequent locks" do + context "with non-zero cursor expiry" do + let(:cursor_expiry) { 5 } + + before { allow(locker).to receive(:monotonic_now) { @epoch } } + + # This test simulates the repeated locking of jobs. We're trying to prove that + # the locker will use the previous jobs ID as a cursor until the expiry has + # elapsed, after which we'll reset. + # + # We do this by expecting on the calls to lock_job, specifically the second + # parameter which controls the job_id cursor value. + it "continues lock from previous job id, until cursor expires" do + @epoch = Process.clock_gettime(Process::CLOCK_MONOTONIC) + expect_to_lock_with(cursor: 0, locked_job: job_1) + expect_to_work(job_1) + + @epoch += 2 + expect_to_lock_with(cursor: job_1[:job_id], locked_job: job_2) + expect_to_work(job_2) + + @epoch += cursor_expiry # our cursor should now expire + expect_to_lock_with(cursor: 0, locked_job: job_3) + expect_to_work(job_3) + end + end + end + # rubocop:enable RSpec/SubjectStub + # rubocop:enable RSpec/InstanceVariable + end + end + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 62222a1..03fd661 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -34,6 +34,22 @@ def establish_database_connection Que.connection = ActiveRecord Que.migrate! + +class LockDataBaseRecord < ActiveRecord::Base + def self.establish_lock_database_connection + establish_connection( + adapter: "postgresql", + host: ENV.fetch("LOCK_PGHOST", "localhost"), + user: ENV.fetch("LOCK_PGUSER", "postgres"), + password: ENV.fetch("LOCK_PGPASSWORD", ""), + database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), + ) + end + def self.connection + establish_lock_database_connection.connection + end +end + # Ensure we have a logger, so that we can test the code paths that log Que.logger = Logger.new("/dev/null")