From 75b372d25a815a79b18f0d09885811e3f2aa6aa1 Mon Sep 17 00:00:00 2001 From: Joseph Southan Date: Wed, 24 Apr 2024 16:35:19 +0100 Subject: [PATCH] Get actual job class if using and ActiveJob wrapper So that metrics and logging will work --- lib/que/worker.rb | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/lib/que/worker.rb b/lib/que/worker.rb index f15f463..a511e0e 100644 --- a/lib/que/worker.rb +++ b/lib/que/worker.rb @@ -148,13 +148,13 @@ def work_loop case event = work when :postgres_error Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval) - @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do + @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do sleep(@wake_interval) end when :job_not_found Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval) - @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do - sleep(@wake_interval) + @tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do + sleep(@wake_interval) end when :job_worked nil # immediately find a new job to work @@ -177,7 +177,7 @@ def work # TODO alerting / monitoring pre addition of secondary queues # assume that a worker would not work a job from another queue. # With the addition of secondary queues this is no longer true. - # To support moving to the new `primary_queue` field without + # To support moving to the new `primary_queue` field without # disrupting existing tooling we "lie" and say the queue worked # is the primary queue. Longer term alerting / monitoring should move # to look at `primary_queue` and `worked_queue` instead. @@ -185,14 +185,14 @@ def work primary_queue: @queue, worked_queue: job["queue"], handler: job["job_class"], - job_class: job["job_class"], + job_class: actual_job_class_name(job["job_class"], job["args"]), job_error_count: job["error_count"], que_job_id: job["job_id"], } labels = { - job_class: job["job_class"], - priority: job["priority"], + job_class: actual_job_class_name(job["job_class"], job["args"]), + priority: job["priority"], queue: @queue, primary_queue: @queue, worked_queue: job["queue"], @@ -300,5 +300,11 @@ def handle_job_failure(error, job) def class_for(string) Que.constantize(string) end + + def actual_job_class_name(class_name, args) + return args.first["job_class"] if /ActiveJob::QueueAdapters/.match?(class_name) + + class_name + end end end