Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument adapter queries for external programs. #105

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/que/adapters/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ class ActiveRecord < Base
::PG::UnableToSend,
].freeze

def initialize(_thing = nil)
super
@instrumenter = ActiveSupport::Notifications.instrumenter
end

def checkout
checkout_activerecord_adapter { |conn| yield conn.raw_connection }
rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e
Expand Down
36 changes: 34 additions & 2 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ module Adapters
class UnavailableConnection < StandardError; end

class Base
attr_reader :instrumenter

def initialize(_thing = nil)
@prepared_statements = {}
@instrumenter = nil
end

# The only method that adapters really need to implement. Should lock a
Expand Down Expand Up @@ -62,7 +65,14 @@ def in_transaction?

def execute_sql(sql, params)
args = params.empty? ? [sql] : [sql, params]
checkout { |conn| conn.async_exec(*args) }

checkout do |conn|
log(sql, conn, params, async: true) do |notification_payload|
conn.async_exec(*args).tap do |result|
notification_payload[:row_count] = result.count
end
end
end
end

def execute_prepared(name, params)
Expand All @@ -81,7 +91,11 @@ def execute_prepared(name, params)
prepared_just_now = statements[name] = true
end

conn.exec_prepared("que_#{name}", params)
log(SQL[name], conn, params, async: false) do |notification_payload|
conn.exec_prepared("que_#{name}", params).tap do |result|
notification_payload[:row_count] = result.count
end
end
rescue ::PG::InvalidSqlStatementName => e
# Reconnections on ActiveRecord can cause the same connection
# objects to refer to new backends, so recover as well as we can.
Expand All @@ -97,6 +111,24 @@ def execute_prepared(name, params)
end
end

# rubocop:disable Metrics/ParameterLists
def log(sql, conn, binds = [], type_casted_binds = [], name = "SQL", statement_name = nil, async: false, &block)
return yield({}) if instrumenter.nil?

instrumenter.instrument(
"que.execute",
sql: sql,
name: name,
binds: binds,
type_casted_binds: type_casted_binds,
async: async,
statement_name: statement_name,
connection: conn,
&block
)
end
# rubocop:enable Metrics/ParameterLists

CAST_PROCS = {
# booleans
16 => ->(value) {
Expand Down
34 changes: 32 additions & 2 deletions spec/lib/que/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
RSpec.describe Que::Job do
describe ".enqueue" do
let(:run_at) { postgres_now }
let(:job_class) do
Class.new(described_class).tap do |klass|
stub_const("FooBarJob", klass)
end
end

it "adds job to que_jobs table" do
expect { described_class.enqueue(:hello, run_at: run_at) }.
Expand Down Expand Up @@ -41,15 +46,14 @@
que_job_id: an_instance_of(Integer),
queue: "default",
priority: 100,
job_class: a_string_including("Class"),
job_class: "FooBarJob",
retryable: true,
run_at: run_at,
args: [500, "gbp", "testing"],
custom_log_1: 500,
custom_log_2: "test-log",
)

job_class = Class.new(described_class)
job_class.custom_log_context ->(attrs) {
{
custom_log_1: attrs[:args][0],
Expand All @@ -59,6 +63,32 @@
job_class.enqueue(500, :gbp, :testing, run_at: run_at)
end

it "instruments queries using ActiveSupport::Notifications" do
events = []

ActiveSupport::Notifications.subscribe("que.execute") do |event|
events << event
end

job_class.enqueue("foobar")

expect(events).to include(
having_attributes(
name: "que.execute",
payload: hash_including(
sql: /INSERT INTO que_jobs/,
name: "SQL",
binds: [nil, nil, nil, "FooBarJob", true, "[\"foobar\"]"],
type_casted_binds: [],
async: false,
statement_name: nil,
connection: instance_of(PG::Connection),
row_count: 1,
),
),
)
end

context "with a custom adapter specified" do
let(:custom_adapter) { Que.adapter.dup }
let(:job_with_adapter) { Class.new(described_class) }
Expand Down