Skip to content

Commit

Permalink
Get actual job class if using and ActiveJob wrapper
Browse files Browse the repository at this point in the history
So that metrics and logging will work
  • Loading branch information
JoeSouthan committed Apr 24, 2024
1 parent 68a8802 commit 75b372d
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ def work_loop
case event = work
when :postgres_error
Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_not_found
Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_worked
nil # immediately find a new job to work
Expand All @@ -177,22 +177,22 @@ def work
# TODO alerting / monitoring pre addition of secondary queues
# assume that a worker would not work a job from another queue.
# With the addition of secondary queues this is no longer true.
# To support moving to the new `primary_queue` field without
# To support moving to the new `primary_queue` field without
# disrupting existing tooling we "lie" and say the queue worked
# is the primary queue. Longer term alerting / monitoring should move
# to look at `primary_queue` and `worked_queue` instead.
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
handler: job["job_class"],
job_class: job["job_class"],
job_class: actual_job_class_name(job["job_class"], job["args"]),
job_error_count: job["error_count"],
que_job_id: job["job_id"],
}

labels = {
job_class: job["job_class"],
priority: job["priority"],
job_class: actual_job_class_name(job["job_class"], job["args"]),
priority: job["priority"],
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
Expand Down Expand Up @@ -300,5 +300,11 @@ def handle_job_failure(error, job)
def class_for(string)
Que.constantize(string)
end

def actual_job_class_name(class_name, args)
return args.first["job_class"] if /ActiveJob::QueueAdapters/.match?(class_name)

class_name
end
end
end

0 comments on commit 75b372d

Please sign in to comment.