Skip to content

Commit

Permalink
Instrument adapter queries for external programs.
Browse files Browse the repository at this point in the history
When executing queries via the adapter, we may not recieve the
instrumentation we expect. The ActiveRecord adapter uses a raw
connection which bypasses the ActiveRecord instrumentation layer and
instead talks directly to PG, making it impossible to know what queries
are being executed.

This change cribs heavily from the implementation of the `log` method on
the `AbstractAdapter` class in ActiveRecord to make it easy to add
instrumentation to an adapter. By default the instrumenter is set to
nil, and instrumentation will be a no-op; otherwise the query will be
instrumented via the `instrument` method and passed the relevant payload
to handle. The structure of this is deliberately similar to that of the
event emitted by ActiveRecord's `sql.active_record` notification.
  • Loading branch information
benk-gc committed Jul 5, 2024
1 parent 066cc01 commit feed35c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
5 changes: 5 additions & 0 deletions lib/que/adapters/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ class ActiveRecord < Base
::PG::UnableToSend,
].freeze

def initialize(_thing = nil)
super
@instrumenter = ActiveSupport::Notifications.instrumenter
end

def checkout
checkout_activerecord_adapter { |conn| yield conn.raw_connection }
rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e
Expand Down
33 changes: 31 additions & 2 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ module Adapters
class UnavailableConnection < StandardError; end

class Base
attr_reader :instrumenter

def initialize(_thing = nil)
@prepared_statements = {}
@instrumenter = nil
end

# The only method that adapters really need to implement. Should lock a
Expand Down Expand Up @@ -62,7 +65,14 @@ def in_transaction?

def execute_sql(sql, params)
args = params.empty? ? [sql] : [sql, params]
checkout { |conn| conn.async_exec(*args) }

checkout do |conn|
log(sql, params, conn, async: true) do |notification_payload|
conn.async_exec(*args).tap do |result|
notification_payload[:row_count] = result.count
end
end
end
end

def execute_prepared(name, params)
Expand All @@ -81,7 +91,11 @@ def execute_prepared(name, params)
prepared_just_now = statements[name] = true
end

conn.exec_prepared("que_#{name}", params)
log(SQL[name], params, conn, async: false) do |notification_payload|
conn.exec_prepared("que_#{name}", params).tap do |result|
notification_payload[:row_count] = result.count
end
end
rescue ::PG::InvalidSqlStatementName => error
# Reconnections on ActiveRecord can cause the same connection
# objects to refer to new backends, so recover as well as we can.
Expand All @@ -97,6 +111,21 @@ def execute_prepared(name, params)
end
end

def log(sql, binds, conn, statement_name: nil, async: false, &block)
return yield({}) if instrumenter.nil?

instrumenter.instrument(
"que.active_record",
sql: sql,
binds: binds,
type_casted_binds: [],
async: async,
statement_name: statement_name,
connection: conn,
&block
)
end

CAST_PROCS = {
# booleans
16 => ->(value) {
Expand Down
33 changes: 31 additions & 2 deletions spec/lib/que/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
RSpec.describe Que::Job do
describe ".enqueue" do
let(:run_at) { postgres_now }
let(:job_class) do
Class.new(described_class).tap do |klass|
stub_const("FooBarJob", klass)
end
end

it "adds job to que_jobs table" do
expect { described_class.enqueue(:hello, run_at: run_at) }.
Expand Down Expand Up @@ -41,15 +46,14 @@
que_job_id: an_instance_of(Integer),
queue: "default",
priority: 100,
job_class: a_string_including("Class"),
job_class: "FooBarJob",
retryable: true,
run_at: run_at,
args: [500, "gbp", "testing"],
custom_log_1: 500,
custom_log_2: "test-log",
)

job_class = Class.new(described_class)
job_class.custom_log_context ->(attrs) {
{
custom_log_1: attrs[:args][0],
Expand All @@ -59,6 +63,31 @@
job_class.enqueue(500, :gbp, :testing, run_at: run_at)
end

it "instruments queries using ActiveSupport::Notifications" do
events = []

ActiveSupport::Notifications.subscribe("que.active_record") do |event|
events << event
end

job_class.enqueue("foobar")

expect(events).to include(
having_attributes(
name: "que.active_record",
payload: hash_including(
sql: /INSERT INTO que_jobs/,
binds: [nil, nil, nil, "FooBarJob", true, "[\"foobar\"]"],
type_casted_binds: [],
async: false,
statement_name: nil,
connection: instance_of(PG::Connection),
row_count: 1,
),
),
)
end

context "with a custom adapter specified" do
let(:custom_adapter) { Que.adapter.dup }
let(:job_with_adapter) { Class.new(described_class) }
Expand Down

0 comments on commit feed35c

Please sign in to comment.