diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index 62b186a1..92bef39f 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -38,7 +38,7 @@ class Job < ::ActiveRecord::Base :failed_at, :locked_at, :locked_by, :handler end - scope :by_priority, lambda { order("priority ASC, run_at ASC") } + scope :by_priority, lambda { order(:priority, :run_at) } scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority } scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority } scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? } @@ -52,13 +52,14 @@ def self.set_delayed_job_table_name set_delayed_job_table_name - def self.ready_to_run(worker_name, max_run_time) - where( - "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL", - db_time_now, - db_time_now - max_run_time, - worker_name - ) + def self.ready_to_run(worker_name, max_run_time) # rubocop:disable Metrics/AbcSize + not_failed = arel_table[:failed_at].eq(nil) + time_to_run = arel_table[:run_at].lteq(db_time_now) + not_locked = arel_table[:locked_at].eq(nil) + lock_expired = arel_table[:locked_at].lt(db_time_now - max_run_time) + locked_by_me = arel_table[:locked_by].eq(worker_name) + + where(not_failed.and(time_to_run).and(not_locked.or(lock_expired).or(locked_by_me))) end def self.before_fork @@ -128,11 +129,14 @@ def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE # Note: active_record would attempt to generate UPDATE...LIMIT like # SQL for Postgres if we use a .limit() filter, but it would not - # use 'FOR UPDATE' and we would have many locking conflicts - quoted_name = connection.quote_table_name(table_name) - subquery = ready_scope.limit(1).lock(true).select("id").to_sql - sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *" - reserved = find_by_sql([sql, now, worker.name]) + # use 'FOR UPDATE' and we would have many locking conflicts. + # This was further updated for some edge cases around locking multiple records + # with solutions discussed at length here + # https://dba.stackexchange.com/questions/69471/postgres-update-limit-1 + quoted_name = Delayed::Job.arel_table.alias(:dj).to_sql + subquery = ready_scope.where("pg_try_advisory_xact_lock(id)").limit(1).lock(true).select(:id).to_sql + sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE dj.id = (#{subquery}) RETURNING dj.*" + reserved = transaction { find_by_sql([sql, now, worker.name]) } reserved[0] end @@ -143,12 +147,16 @@ def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) # while updating. But during the where clause, for mysql(>=5.6.4), # it queries with precision as well. So removing the precision now = now.change(usec: 0) + # This works on MySQL and possibly some other DBs that support # UPDATE...LIMIT. It uses separate queries to lock and return the job - count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) - return nil if count == 0 + sets = "locked_at = :now, locked_by = :name, id = (SELECT @dj_update_id := id)" + transaction do + count = ready_scope.limit(1).update_all([sets, now: now, name: worker.name]) + return nil if count == 0 - where(locked_at: now, locked_by: worker.name, failed_at: nil).first + where("id = @dj_update_id").first + end end def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)