From af8d14c5ac99b90ebef401fe2aa9319656dc276a Mon Sep 17 00:00:00 2001 From: benk-gc Date: Fri, 5 Jul 2024 13:07:47 +0100 Subject: [PATCH] Instrument adapter queries for external programs. When executing queries via the adapter, we may not recieve the instrumentation we expect. The ActiveRecord adapter uses a raw connection which bypasses the ActiveRecord instrumentation layer and instead talks directly to PG, making it impossible to know what queries are being executed. This change cribs heavily from the implementation of the `log` method on the `AbstractAdapter` class in ActiveRecord to make it easy to add instrumentation to an adapter. By default the instrumenter is set to `nil`, and instrumentation will be a no-op; otherwise the query will be instrumented via the `instrument` method and passed the relevant payload to handle. The structure of this is deliberately similar to that of the event emitted by ActiveRecord's `sql.active_record` notification. --- lib/que/adapters/active_record.rb | 5 +++++ lib/que/adapters/base.rb | 36 +++++++++++++++++++++++++++++-- spec/lib/que/job_spec.rb | 34 +++++++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/lib/que/adapters/active_record.rb b/lib/que/adapters/active_record.rb index 1a0edfd..6a4b02f 100644 --- a/lib/que/adapters/active_record.rb +++ b/lib/que/adapters/active_record.rb @@ -13,6 +13,11 @@ class ActiveRecord < Base ::PG::UnableToSend, ].freeze + def initialize(_thing = nil) + super + @instrumenter = ActiveSupport::Notifications.instrumenter + end + def checkout checkout_activerecord_adapter { |conn| yield conn.raw_connection } rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index 669e9d9..1295e01 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -13,8 +13,11 @@ module Adapters class UnavailableConnection < StandardError; end class Base + attr_reader :instrumenter + def initialize(_thing = nil) @prepared_statements = {} + @instrumenter = nil end # The only method that adapters really need to implement. Should lock a @@ -62,7 +65,14 @@ def in_transaction? def execute_sql(sql, params) args = params.empty? ? [sql] : [sql, params] - checkout { |conn| conn.async_exec(*args) } + + checkout do |conn| + log(sql, conn, params, async: true) do |notification_payload| + conn.async_exec(*args).tap do |result| + notification_payload[:row_count] = result.count + end + end + end end def execute_prepared(name, params) @@ -81,7 +91,11 @@ def execute_prepared(name, params) prepared_just_now = statements[name] = true end - conn.exec_prepared("que_#{name}", params) + log(SQL[name], conn, params, async: false) do |notification_payload| + conn.exec_prepared("que_#{name}", params).tap do |result| + notification_payload[:row_count] = result.count + end + end rescue ::PG::InvalidSqlStatementName => e # Reconnections on ActiveRecord can cause the same connection # objects to refer to new backends, so recover as well as we can. @@ -97,6 +111,24 @@ def execute_prepared(name, params) end end + # rubocop:disable Metrics/ParameterLists + def log(sql, conn, binds = [], type_casted_binds = [], name = "SQL", statement_name = nil, async: false, &block) + return yield({}) if instrumenter.nil? + + instrumenter.instrument( + "que.execute", + sql: sql, + name: name, + binds: binds, + type_casted_binds: type_casted_binds, + async: async, + statement_name: statement_name, + connection: conn, + &block + ) + end + # rubocop:enable Metrics/ParameterLists + CAST_PROCS = { # booleans 16 => ->(value) { diff --git a/spec/lib/que/job_spec.rb b/spec/lib/que/job_spec.rb index 33b4f27..dafed51 100644 --- a/spec/lib/que/job_spec.rb +++ b/spec/lib/que/job_spec.rb @@ -5,6 +5,11 @@ RSpec.describe Que::Job do describe ".enqueue" do let(:run_at) { postgres_now } + let(:job_class) do + Class.new(described_class).tap do |klass| + stub_const("FooBarJob", klass) + end + end it "adds job to que_jobs table" do expect { described_class.enqueue(:hello, run_at: run_at) }. @@ -41,7 +46,7 @@ que_job_id: an_instance_of(Integer), queue: "default", priority: 100, - job_class: a_string_including("Class"), + job_class: "FooBarJob", retryable: true, run_at: run_at, args: [500, "gbp", "testing"], @@ -49,7 +54,6 @@ custom_log_2: "test-log", ) - job_class = Class.new(described_class) job_class.custom_log_context ->(attrs) { { custom_log_1: attrs[:args][0], @@ -59,6 +63,32 @@ job_class.enqueue(500, :gbp, :testing, run_at: run_at) end + it "instruments queries using ActiveSupport::Notifications" do + events = [] + + ActiveSupport::Notifications.subscribe("que.execute") do |event| + events << event + end + + job_class.enqueue("foobar") + + expect(events).to include( + having_attributes( + name: "que.execute", + payload: hash_including( + sql: /INSERT INTO que_jobs/, + name: "SQL", + binds: [nil, nil, nil, "FooBarJob", true, "[\"foobar\"]"], + type_casted_binds: [], + async: false, + statement_name: nil, + connection: instance_of(PG::Connection), + row_count: 1, + ), + ), + ) + end + context "with a custom adapter specified" do let(:custom_adapter) { Que.adapter.dup } let(:job_with_adapter) { Class.new(described_class) }