Skip to content

Commit

Permalink
Merge pull request #101 from gocardless/joesouthan-aj-realjob
Browse files Browse the repository at this point in the history
Get actual job class if using an ActiveJob wrapper
  • Loading branch information
JoeSouthan authored Apr 24, 2024
2 parents 68a8802 + 75b372d commit 0ea7359
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ def work_loop
case event = work
when :postgres_error
Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_not_found
Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_worked
nil # immediately find a new job to work
Expand All @@ -177,22 +177,22 @@ def work
# TODO alerting / monitoring pre addition of secondary queues
# assume that a worker would not work a job from another queue.
# With the addition of secondary queues this is no longer true.
# To support moving to the new `primary_queue` field without
# To support moving to the new `primary_queue` field without
# disrupting existing tooling we "lie" and say the queue worked
# is the primary queue. Longer term alerting / monitoring should move
# to look at `primary_queue` and `worked_queue` instead.
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
handler: job["job_class"],
job_class: job["job_class"],
job_class: actual_job_class_name(job["job_class"], job["args"]),
job_error_count: job["error_count"],
que_job_id: job["job_id"],
}

labels = {
job_class: job["job_class"],
priority: job["priority"],
job_class: actual_job_class_name(job["job_class"], job["args"]),
priority: job["priority"],
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
Expand Down Expand Up @@ -300,5 +300,11 @@ def handle_job_failure(error, job)
def class_for(string)
Que.constantize(string)
end

def actual_job_class_name(class_name, args)
return args.first["job_class"] if /ActiveJob::QueueAdapters/.match?(class_name)

class_name
end
end
end

0 comments on commit 0ea7359

Please sign in to comment.