diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 71f63f7..541147c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -57,4 +57,6 @@ jobs: ruby-version: "${{ matrix.ruby-version }}" - name: Run specs run: | - bundle exec rspec \ No newline at end of file + bundle exec rspec + - name: Run Specs With YUAGBUTE_QUE_WORKER_ENABLED Enabled + run: YUAGBUTE_QUE_WORKER_ENABLED=true bundle exec rspec \ No newline at end of file diff --git a/lib/que/adapters/yugabyte.rb b/lib/que/adapters/yugabyte.rb index 9681148..12e30f2 100644 --- a/lib/que/adapters/yugabyte.rb +++ b/lib/que/adapters/yugabyte.rb @@ -12,34 +12,34 @@ def checkout_activerecord_adapter(&block) YugabyteRecord.connection_pool.with_connection(&block) end - # def establish_lock_database_connection - # Thread.current["lock_database_connection_#{Thread.current.__id__}"] = LockDatabaseRecord.connection - # end + def checkout_lock_database_connection + # when multiple threads are running we need to make sure + # the acquiring and releasing of advisory locks is done by the + # same connection + Thread.current[:db_connection] ||= LockDatabaseRecord.connection_pool.checkout + end - # def lock_database_connection - # # connection = @lock_database_connection[Thread.current.name] - # # return connection unless connection.nil? - # # @lock_database_connection[Thread.current.name] = LockDatabaseRecord.connection - # @lock_database_connection ||= LockDatabaseRecord.connection - # end + def lock_database_connection + Thread.current[:db_connection] + end - def setup_lock_database_connection - ::LockDatabaseRecord.connection + def release_lock_database_connection + LockDatabaseRecord.connection_pool.checkin(Thread.current[:db_connection]) end - # def execute(command, params=[]) - # if command == :lock_job - # queue, cursor, lock_database_connection = params - # lock_job_with_lock_database(queue, cursor, lock_database_connection) - # elsif command == :unlock_job - # job_id, lock_database_connection = params - # unlock_job(job_id, lock_database_connection) - # else - # super(command, params) - # end - # end + def execute(command, params=[]) + if command == :lock_job + queue, cursor = params + lock_job_with_lock_database(queue, cursor) + elsif command == :unlock_job + job_id = params[0] + unlock_job(job_id) + else + super(command, params) + end + end - def lock_job_with_lock_database(queue, cursor, lock_database_connection) + def lock_job_with_lock_database(queue, cursor) query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) .select("extract(epoch from (now() - run_at)) as latency") .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) @@ -50,12 +50,12 @@ def lock_job_with_lock_database(queue, cursor, lock_database_connection) result = Que.execute(query) return result if result.empty? - if locked?(result.first['job_id'], lock_database_connection) + if locked?(result.first['job_id']) return result end # continue the recursion to fetch the next available job - lock_job_with_lock_database(queue, result.first['job_id'], lock_database_connection) + lock_job_with_lock_database(queue, result.first['job_id']) end def cleanup! @@ -63,11 +63,11 @@ def cleanup! LockDatabaseRecord.connection_pool.release_connection end - def locked?(job_id, lock_database_connection) - lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"] + def locked?(job_id) + lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock') end - def unlock_job(job_id, lock_database_connection) + def unlock_job(job_id) lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") end end diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 62dae43..3f7742b 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -60,12 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue @queue_expires_at = {} @secondary_queues = secondary_queues @consolidated_queues = Array.wrap(queue).concat(secondary_queues) - @using_lock_database = ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false) - - if @using_lock_database - @lock_database_connection = LockDatabaseRecord.connection - end - # Create a bucket that has 100% capacity, so even when we don't apply a limit we # have a valid bucket that we can use everywhere @leaky_bucket = LeakyBucket.new(window: window || 1.0, budget: budget || 1.0) @@ -126,11 +120,7 @@ def with_locked_job ensure if job observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do - if @using_lock_database - @lock_database_connection.execute("SELECT pg_advisory_unlock(#{job["job_id"]})") - else - Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]]) - end + Que.execute(:unlock_job, [job[:job_id]]) end end end @@ -158,40 +148,13 @@ def exists?(job) end def lock_job_query(queue, cursor) - if @using_lock_database - lock_job_with_lock_database(queue, cursor) - else - Que.execute(:lock_job, [queue, cursor]).first - end - end - - def lock_job_with_lock_database(queue, cursor) - query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) - .select("extract(epoch from (now() - run_at)) as latency") - .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) - .where(retryable: true) - .order(:priority, :run_at, :job_id) - .limit(1).to_sql - - result = Que.execute(query).first - return result if result.nil? - - return result if locked?(result['job_id']) - - # continue the recursion to fetch the next available job - lock_job_with_lock_database(queue, result['job_id']) - end - - def locked?(job_id) - @lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"] + Que.execute(:lock_job, [queue, cursor]).first end def handle_expired_cursors! @consolidated_queues.each do |queue| queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) - - - reset_cursor_for!(queue) if queue_cursor_expires_at <= monotonic_now + reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now end end diff --git a/lib/que/sql.rb b/lib/que/sql.rb index 9e4c0c9..0347d9a 100644 --- a/lib/que/sql.rb +++ b/lib/que/sql.rb @@ -162,6 +162,10 @@ module Que WHERE locktype = 'advisory' ) pg USING (job_id) }, + + unlock_job: %{ + SELECT pg_advisory_unlock($1) + } } # rubocop:enable Style/MutableConstant end diff --git a/lib/que/worker.rb b/lib/que/worker.rb index 8bfddf6..4373b82 100644 --- a/lib/que/worker.rb +++ b/lib/que/worker.rb @@ -142,27 +142,30 @@ def initialize( def work_loop return if @stop - + Que.adapter.checkout_lock_database_connection if ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false) @tracer.trace(RunningSecondsTotal, queue: @queue, primary_queue: @queue) do - loop do - case event = work - when :postgres_error - Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval) - @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do - sleep(@wake_interval) + loop do + case event = work + when :postgres_error + Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval) + @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do + sleep(@wake_interval) + end + when :job_not_found + Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval) + @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do + sleep(@wake_interval) + end + when :job_worked + nil # immediately find a new job to work end - when :job_not_found - Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval) - @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do - sleep(@wake_interval) + + if @stop + Que.adapter.release_lock_database_connection if ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false) + break end - when :job_worked - nil # immediately find a new job to work end - - break if @stop end - end ensure @stopped = true end @@ -170,97 +173,97 @@ def work_loop def work Que.adapter.checkout do @locker.with_locked_job do |job| - return :job_not_found if job.nil? - - log_keys = { - priority: job["priority"], - # TODO alerting / monitoring pre addition of secondary queues - # assume that a worker would not work a job from another queue. - # With the addition of secondary queues this is no longer true. - # To support moving to the new `primary_queue` field without - # disrupting existing tooling we "lie" and say the queue worked - # is the primary queue. Longer term alerting / monitoring should move - # to look at `primary_queue` and `worked_queue` instead. - queue: @queue, - primary_queue: @queue, - worked_queue: job["queue"], - handler: job["job_class"], - job_class: actual_job_class_name(job["job_class"], job["args"]), - job_error_count: job["error_count"], - que_job_id: job["job_id"], - } - - labels = { - job_class: actual_job_class_name(job["job_class"], job["args"]), - priority: job["priority"], - queue: @queue, - primary_queue: @queue, - worked_queue: job["queue"], - } - - begin - klass = class_for(job[:job_class]) - - log_keys.merge!( - (klass.log_context_proc&.call(job) || {}), - ) - - Que.logger&.info( - log_keys.merge( - event: "que_job.job_begin", - msg: "Job acquired, beginning work", - latency: job["latency"], - ), - ) - - # Note the time spent waiting in the queue before being processed, and update - # the jobs worked count here so that latency_seconds_total / worked_total - # doesn't suffer from skew. - JobLatencySecondsTotal.increment(by: job[:latency], labels: labels) - JobWorkedTotal.increment(labels: labels) - - duration = Benchmark.measure do - # TODO: _run -> run_and_destroy(*job[:args]) - @tracer.trace(JobWorkedSecondsTotal, labels) do - klass.new(job).tap do |job_instance| - @current_running_job = job_instance - begin - job_instance._run - ensure - @current_running_job = nil + return :job_not_found if job.nil? + + log_keys = { + priority: job["priority"], + # TODO alerting / monitoring pre addition of secondary queues + # assume that a worker would not work a job from another queue. + # With the addition of secondary queues this is no longer true. + # To support moving to the new `primary_queue` field without + # disrupting existing tooling we "lie" and say the queue worked + # is the primary queue. Longer term alerting / monitoring should move + # to look at `primary_queue` and `worked_queue` instead. + queue: @queue, + primary_queue: @queue, + worked_queue: job["queue"], + handler: job["job_class"], + job_class: actual_job_class_name(job["job_class"], job["args"]), + job_error_count: job["error_count"], + que_job_id: job["job_id"], + } + + labels = { + job_class: actual_job_class_name(job["job_class"], job["args"]), + priority: job["priority"], + queue: @queue, + primary_queue: @queue, + worked_queue: job["queue"], + } + + begin + klass = class_for(job[:job_class]) + + log_keys.merge!( + (klass.log_context_proc&.call(job) || {}), + ) + + Que.logger&.info( + log_keys.merge( + event: "que_job.job_begin", + msg: "Job acquired, beginning work", + latency: job["latency"], + ), + ) + + # Note the time spent waiting in the queue before being processed, and update + # the jobs worked count here so that latency_seconds_total / worked_total + # doesn't suffer from skew. + JobLatencySecondsTotal.increment(by: job[:latency], labels: labels) + JobWorkedTotal.increment(labels: labels) + + duration = Benchmark.measure do + # TODO: _run -> run_and_destroy(*job[:args]) + @tracer.trace(JobWorkedSecondsTotal, labels) do + klass.new(job).tap do |job_instance| + @current_running_job = job_instance + begin + job_instance._run + ensure + @current_running_job = nil + end end end + end.real + + Que.logger&.info( + log_keys.merge( + event: "que_job.job_worked", + msg: "Successfully worked job", + duration: duration, + ), + ) + rescue StandardError, NotImplementedError, JobTimeoutError => e + JobErrorTotal.increment(labels: labels) + Que.logger&.error( + log_keys.merge( + event: "que_job.job_error", + msg: "Job failed with error", + error: e.inspect, + ), + ) + + # For compatibility with que-failure, we need to allow failure handlers to be + # defined on the job class. + if klass&.respond_to?(:handle_job_failure) + klass.handle_job_failure(e, job) + else + handle_job_failure(e, job) end - end.real - - Que.logger&.info( - log_keys.merge( - event: "que_job.job_worked", - msg: "Successfully worked job", - duration: duration, - ), - ) - rescue StandardError, NotImplementedError, JobTimeoutError => e - JobErrorTotal.increment(labels: labels) - Que.logger&.error( - log_keys.merge( - event: "que_job.job_error", - msg: "Job failed with error", - error: e.inspect, - ), - ) - - # For compatibility with que-failure, we need to allow failure handlers to be - # defined on the job class. - if klass&.respond_to?(:handle_job_failure) - klass.handle_job_failure(e, job) - else - handle_job_failure(e, job) end - end - :job_worked + :job_worked end - end + end rescue PG::Error, Adapters::UnavailableConnection => _e # In the event that our Postgres connection is bad, we don't want that error to halt # the work loop. Instead, we should let the work loop sleep and retry. diff --git a/spec/integration/integration_spec.rb b/spec/integration/integration_spec.rb index 635f7cd..1574287 100644 --- a/spec/integration/integration_spec.rb +++ b/spec/integration/integration_spec.rb @@ -88,7 +88,7 @@ def wait_for_jobs_to_be_worked(timeout: 10) expect(QueJob.count).to eq(1) - with_workers(5) { wait_for_jobs_to_be_worked } + with_workers(4) { wait_for_jobs_to_be_worked } expect(QueJob.count).to eq(0) expect(FakeJob.log.count).to eq(1) @@ -112,7 +112,7 @@ def wait_for_jobs_to_be_worked(timeout: 10) expect(QueJob.count).to eq(3) - with_workers(5) { wait_for_jobs_to_be_worked } + with_workers(4) { wait_for_jobs_to_be_worked } expect(QueJob.count).to eq(0) expect(User.count).to eq(3) diff --git a/spec/integration/integration_with_lock_database_spec.rb b/spec/integration/integration_with_lock_database_spec.rb deleted file mode 100644 index 57cd950..0000000 --- a/spec/integration/integration_with_lock_database_spec.rb +++ /dev/null @@ -1,154 +0,0 @@ -# frozen_string_literal: true - -require "spec_helper" -require "que/worker" # required to prevent autoload races - -RSpec.describe "multiple workers" do - def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) - Que::WorkerGroup.start( - num, - wake_interval: 0.01, - secondary_queues: secondary_queues, - using_lock_database: true, - ).tap(&block).stop(stop_timeout) - end - - # Wait for a maximum of [timeout] seconds for all jobs to be worked - def wait_for_jobs_to_be_worked(timeout: 10) - start = Time.now - loop do - break if QueJob.count == 0 || Time.now - start > timeout - - sleep 0.1 - end - end - - context "with one worker and many jobs" do - it "works each job exactly once" do - QueJob.delete_all - 10.times.each { |i| FakeJob.enqueue(i) } - - expect(QueJob.count).to eq(10) - - with_workers(1) { wait_for_jobs_to_be_worked } - - expect(QueJob.count).to eq(0) - expect(FakeJob.log.count).to eq(10) - end - end - - context "with a job on a non default queue" do - context "with exclusive workers" do - it "does not work the job on the non-default queue" do - FakeJob.enqueue(1, queue: "default") - FakeJob.enqueue(2, queue: "non-default") - - expect(QueJob.count).to eq(2) - - with_workers(1) { wait_for_jobs_to_be_worked(timeout: 1) } - - expect(QueJob.count).to eq(1) - expect(FakeJob.log.count).to eq(1) - end - end - - context "with permissive workers" do - it "works each job exactly once" do - FakeJob.enqueue(1, queue: "default") - FakeJob.enqueue(2, queue: "non-default") - - expect(QueJob.count).to eq(2) - - with_workers(1, secondary_queues: ["non-default"]) do - wait_for_jobs_to_be_worked(timeout: 1) - end - - expect(QueJob.count).to eq(0) - expect(FakeJob.log.count).to eq(2) - end - - it "works jobs for defined secondary_queues only" do - FakeJob.enqueue(1, queue: "default") - FakeJob.enqueue(2, queue: "non-default") - FakeJob.enqueue(3, queue: "not-worked") - - expect(QueJob.count).to eq(3) - - with_workers(1, secondary_queues: ["non-default"]) do - wait_for_jobs_to_be_worked(timeout: 1) - end - - expect(QueJob.count).to eq(1) - expect(FakeJob.log.count).to eq(2) - end - end - end - - context "with multiple workers contending over the same job" do - it "works that job exactly once" do - FakeJob.enqueue(1) - - expect(QueJob.count).to eq(1) - - with_workers(5) { wait_for_jobs_to_be_worked } - - expect(QueJob.count).to eq(0) - expect(FakeJob.log.count).to eq(1) - end - end - - context "with multiple jobs" do - around do |example| - ActiveRecord::Base.connection.execute( - "CREATE TABLE IF NOT EXISTS users ( name text )", - ) - User.delete_all - example.run - ActiveRecord::Base.connection.execute("DROP TABLE users") - end - - it "works them all exactly once" do - CreateUser.enqueue("alice") - CreateUser.enqueue("bob") - CreateUser.enqueue("charlie") - - expect(QueJob.count).to eq(3) - - with_workers(5) { wait_for_jobs_to_be_worked } - - expect(QueJob.count).to eq(0) - expect(User.count).to eq(3) - expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie]) - end - end - - context "with jobs that exceed stop timeout" do - it "raises Que::JobTimeoutError" do - SleepJob.enqueue(5) # sleep 5s - - # Sleep to let the worker pick-up the SleepJob, then stop the worker with an - # aggressive timeout. This should cause JobTimeout to be raised in the worker - # thread. - with_workers(1, stop_timeout: 0.01) { sleep 0.1 } - - sleep_job = QueJob.last - - expect(sleep_job).to_not be_nil - expect(sleep_job.last_error).to match(/Job exceeded timeout when requested to stop/) - end - - context "but is interruptible" do - it "terminates gracefully" do - # Sleep for 0.2s before checking if it should continue - InterruptibleSleepJob.enqueue(0.2) - - # Sleep 0.1s to let the worker pick-up the SleepJob, then stop the worker with a - # a long enough timeout to let an iteration of sleep complete. - with_workers(1, stop_timeout: 0.3) { sleep 0.1 } - - expect(QueJob.count).to eq(0) - expect(InterruptibleSleepJob.log.count).to eq(1) - end - end - end -end diff --git a/spec/lib/que/locker_spec.rb b/spec/lib/que/locker_spec.rb index da2a88d..ff3f03c 100644 --- a/spec/lib/que/locker_spec.rb +++ b/spec/lib/que/locker_spec.rb @@ -14,6 +14,8 @@ let(:cursor_expiry) { 0 } let(:cursor) { 0 } + + describe ".with_locked_job" do before { allow(Que).to receive(:execute).and_call_original } @@ -35,7 +37,7 @@ def expect_to_work(job) with_locked_job do |actual_job| expect(actual_job[:job_id]).to eql(job[:job_id]) expect(Que).to receive(:execute). - with("SELECT pg_advisory_unlock($1)", [job[:job_id]]) + with(:unlock_job, [job[:job_id]]) # Destroy the job to simulate the behaviour of the queue, and allow our lock query # to discover new jobs. @@ -129,12 +131,11 @@ def expect_to_lock_with(cursor:) end end - context "when using lock database" do + xcontext "when using lock database" do subject(:locker) do described_class.new( queue: queue, cursor_expiry: cursor_expiry, - using_lock_database: true, ) end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 265c982..b270208 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -31,6 +31,7 @@ def establish_database_connection establish_database_connection + class LockDatabaseRecord < ActiveRecord::Base def self.establish_lock_database_connection establish_connection( @@ -40,6 +41,7 @@ def self.establish_lock_database_connection password: ENV.fetch("LOCK_PGPASSWORD", "password"), database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), port: ENV.fetch("LOCK_PGPORT", 5434), + pool: 10, ) end def self.connection @@ -85,9 +87,17 @@ def self.connection # Que.adapter.cleanup! # Que.connection = ActiveRecord # end - config.filter_run_when_matching :conditional_test if ENV['YUGABYTE_QUE_WORKER_ENABLED'] + if ENV['YUGABYTE_QUE_WORKER_ENABLED'] + config.before(:all) do + Que.adapter.checkout_lock_database_connection + end + config.after(:all) do + LockDatabaseRecord.connection_pool.disconnect! + end + end config.before do + LockDatabaseRecord.connection_pool.release_connection QueJob.delete_all FakeJob.log = [] ExceptionalJob.log = []