diff --git a/lib/que/adapters/active_record.rb b/lib/que/adapters/active_record.rb index 1a0edfd..6a4b02f 100644 --- a/lib/que/adapters/active_record.rb +++ b/lib/que/adapters/active_record.rb @@ -13,6 +13,11 @@ class ActiveRecord < Base ::PG::UnableToSend, ].freeze + def initialize(_thing = nil) + super + @instrumenter = ActiveSupport::Notifications.instrumenter + end + def checkout checkout_activerecord_adapter { |conn| yield conn.raw_connection } rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index edf0b92..898ed2c 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -13,8 +13,11 @@ module Adapters class UnavailableConnection < StandardError; end class Base + attr_reader :instrumenter + def initialize(_thing = nil) @prepared_statements = {} + @instrumenter = nil end # The only method that adapters really need to implement. Should lock a @@ -62,7 +65,14 @@ def in_transaction? def execute_sql(sql, params) args = params.empty? ? [sql] : [sql, params] - checkout { |conn| conn.async_exec(*args) } + + checkout do |conn| + log(sql, conn, params, async: true) do |notification_payload| + conn.async_exec(*args).tap do |result| + notification_payload[:row_count] = result.count + end + end + end end def execute_prepared(name, params) @@ -81,7 +91,11 @@ def execute_prepared(name, params) prepared_just_now = statements[name] = true end - conn.exec_prepared("que_#{name}", params) + log(SQL[name], conn, params, async: false) do |notification_payload| + conn.exec_prepared("que_#{name}", params).tap do |result| + notification_payload[:row_count] = result.count + end + end rescue ::PG::InvalidSqlStatementName => error # Reconnections on ActiveRecord can cause the same connection # objects to refer to new backends, so recover as well as we can. @@ -97,6 +111,22 @@ def execute_prepared(name, params) end end + def log(sql, conn, binds = [], type_casted_binds = [], name = "SQL", statement_name = nil, async: false, &block) + return yield({}) if instrumenter.nil? + + instrumenter.instrument( + "que.execute", + sql: sql, + name: name, + binds: binds, + type_casted_binds: type_casted_binds, + async: async, + statement_name: statement_name, + connection: conn, + &block + ) + end + CAST_PROCS = { # booleans 16 => ->(value) { diff --git a/spec/lib/que/job_spec.rb b/spec/lib/que/job_spec.rb index 09a7112..fd11c83 100644 --- a/spec/lib/que/job_spec.rb +++ b/spec/lib/que/job_spec.rb @@ -5,6 +5,11 @@ RSpec.describe Que::Job do describe ".enqueue" do let(:run_at) { postgres_now } + let(:job_class) do + Class.new(described_class).tap do |klass| + stub_const("FooBarJob", klass) + end + end it "adds job to que_jobs table" do expect { described_class.enqueue(:hello, run_at: run_at) }. @@ -41,7 +46,7 @@ que_job_id: an_instance_of(Integer), queue: "default", priority: 100, - job_class: a_string_including("Class"), + job_class: "FooBarJob", retryable: true, run_at: run_at, args: [500, "gbp", "testing"], @@ -49,7 +54,6 @@ custom_log_2: "test-log", ) - job_class = Class.new(described_class) job_class.custom_log_context ->(attrs) { { custom_log_1: attrs[:args][0], @@ -59,6 +63,32 @@ job_class.enqueue(500, :gbp, :testing, run_at: run_at) end + it "instruments queries using ActiveSupport::Notifications" do + events = [] + + ActiveSupport::Notifications.subscribe("que.execute") do |event| + events << event + end + + job_class.enqueue("foobar") + + expect(events).to include( + having_attributes( + name: "que.execute", + payload: hash_including( + sql: /INSERT INTO que_jobs/, + name: "SQL", + binds: [nil, nil, nil, "FooBarJob", true, "[\"foobar\"]"], + type_casted_binds: [], + async: false, + statement_name: nil, + connection: instance_of(PG::Connection), + row_count: 1, + ), + ), + ) + end + context "with a custom adapter specified" do let(:custom_adapter) { Que.adapter.dup } let(:job_with_adapter) { Class.new(described_class) }