Skip to content

Commit

Permalink
feat: track ActiveRecord async queries
Browse files Browse the repository at this point in the history
  • Loading branch information
zvkemp committed Jan 17, 2025
1 parent 3bb36a6 commit 8b1ccd9
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
module OpenTelemetry
module Instrumentation
module ActiveRecord
QUERY_SPAN_NAME_KEY = OpenTelemetry::Context.create_key('async_query_span_name')

# The Instrumentation class contains logic to detect and install the ActiveRecord instrumentation
class Instrumentation < OpenTelemetry::Instrumentation::Base
MINIMUM_VERSION = Gem::Version.new('7')
Expand Down Expand Up @@ -39,6 +41,7 @@ def require_dependencies
require_relative 'patches/transactions_class_methods'
require_relative 'patches/validations'
require_relative 'patches/relation_persistence'
require_relative 'patches/async_query_context_propagation'
end

def patch_activerecord
Expand All @@ -55,6 +58,9 @@ def patch_activerecord
::ActiveRecord::Base.prepend(Patches::Validations)

::ActiveRecord::Relation.prepend(Patches::RelationPersistence)

::ActiveRecord::ConnectionAdapters::ConnectionPool.prepend(Patches::AsyncQueryContextPropagation)
::ActiveRecord::FutureResult.prepend(Patches::FutureResultExtensions)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Instrumentation
module ActiveRecord
QUERY_SPAN_NAME_KEY = OpenTelemetry::Context.create_key('async_query_span_name')

module Patches
# Module to prepend to ActiveRecord::ConnectionAdapters::ConnectionPool
module AsyncQueryContextPropagation
def schedule_query(future_result) # :nodoc:
context = OpenTelemetry::Context.current

@async_executor.post do
# This can happen in the request thread, in the case of a busy executor (fallback_action is executed.)
# FIXME: This override should be unecessary if the concurrent-ruby instrumentation is always installed.
OpenTelemetry::Context.with_current(context) do
future_result.execute_or_skip
end
end

Thread.pass
end
end

# Module to support otel context propagation to ActiveRecord::FutureResults
module FutureResultExtensions
OTEL_QUERY_SPAN_NAME_IVAR = :@__otel_query_span_name

def initialize(...)
super

if (query_span_name = OpenTelemetry::Context.current.value(QUERY_SPAN_NAME_KEY))
instance_variable_set(OTEL_QUERY_SPAN_NAME_IVAR, query_span_name)
end
end

private

def execute_query(connection, async: false)
name = instance_variable_get(OTEL_QUERY_SPAN_NAME_IVAR) || @args[1] || 'execute_query'
Instrumentation.instance.tracer.in_span(name, attributes: { 'async' => async }) do
super
end
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ module ClassMethods
method_name = ::ActiveRecord.version >= Gem::Version.new('7.0.0') ? :_query_by_sql : :find_by_sql

define_method(method_name) do |*args, **kwargs, &block|
tracer.in_span("#{self} query") do
super(*args, **kwargs, &block)
query_span_name = "#{self} query"
OpenTelemetry::Context.with_value(QUERY_SPAN_NAME_KEY, query_span_name) do
tracer.in_span(kwargs[:async] ? "schedule #{query_span_name}" : query_span_name) do
super(*args, **kwargs, &block)
end
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

require_relative '../../../../lib/opentelemetry/instrumentation/active_record'
require_relative '../../../../lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation'

ASYNC_TEST_LOGGER = Logger.new($stdout).tap { |logger| logger.level = Logger::WARN }

describe OpenTelemetry::Instrumentation::ActiveRecord::Patches::AsyncQueryContextPropagation do
let(:exporter) { EXPORTER }
let(:unfiltered_spans) { exporter.finished_spans }
let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveRecord::Instrumentation.instance }
let(:logger) { ASYNC_TEST_LOGGER }

before do
exporter.reset
setup_asynchronous_queries_session
User.create!
end

after do
teardown_asynchronous_queries_session

ActiveRecord::Base.subclasses.each do |model|
model.connection.truncate(model.table_name)
end
end

def setup_asynchronous_queries_session
@_async_queries_session = ActiveRecord::Base.asynchronous_queries_tracker.start_session
end

def teardown_asynchronous_queries_session
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session(true) if @_async_queries_session
end

def run_async_load
logger.debug('>>> run async load')
in_new_trace do
instrumentation.tracer.in_span('test_wrapper') do
if block_given?
yield
else
users = User.all.load_async
sleep(0.5)
logger.debug('>>> async #to_a')
users.to_a
end
end
end
end

def in_new_trace(&block)
OpenTelemetry::Context.with_current(OpenTelemetry::Context::ROOT, &block)
end

def spans
test_wrapper_span = unfiltered_spans.find { |span| span.name == 'test_wrapper' }
unfiltered_spans.select { |span| span.trace_id == test_wrapper_span.trace_id }
end

def span_names
spans.map(&:name).sort
end

# call with block_queue: true to completely block the executor (no tasks can be enqueued),
# or with block_queue: false to block the workers only (tasks still accepted in the queue)
def with_busy_executor(block_queue: true)
_(ActiveRecord.async_query_executor).must_equal :global_thread_pool

mutex = Mutex.new
condvar = ConditionVariable.new
executor = ActiveRecord.instance_variable_get(:@global_thread_pool_async_query_executor)

task_count = executor.max_length
task_count += executor.max_queue if block_queue

awaiting_signals = (0...task_count).to_a

# Fill up the max thread count and queue with tasks that
# will never complete until they are signaled to do so.
task_count.times do |n|
executor.post do
mutex.synchronize do
ASYNC_TEST_LOGGER.debug("task #{n} waiting...")
condvar.wait(mutex)
ASYNC_TEST_LOGGER.debug("task #{n} got the signal")
awaiting_signals.delete(n)
end
end
end

logger.debug("yielding... block_queue=#{block_queue}")
yield
logger.debug('...done!')
ensure
logger.debug('cleaning up...')
# clean up the queue
mutex.synchronize { condvar.signal } until awaiting_signals.empty?
end

def current_thread_id
Thread.current.object_id
end

def execute_query_span
spans.find { |span| span.name == 'User query' }
end

it 'async_query' do
run_async_load

_(span_names).must_equal(['test_wrapper', 'User query', 'schedule User query'].sort)
_(execute_query_span.attributes['__test_only_thread_id']).wont_equal(current_thread_id)
_(execute_query_span.attributes['async']).must_equal(true)
end

describe 'no executor' do
before do
@async_query_executor_was = ActiveRecord.async_query_executor
ActiveRecord.async_query_executor = nil
end

after do
ActiveRecord.async_query_executor = @async_query_executor_was
end

it 'is not actually async' do
run_async_load # sic

_(spans.map(&:name)).wont_include('Schedule User query')
_(spans.map(&:name)).must_include('User query')

user_query = spans.find { |span| span.name == 'User query' }
_(user_query.attributes['async']).must_equal(false) if user_query.attributes.key?('async')
_(span_names).must_equal(['User query', 'test_wrapper'].sort)
end
end

it 'async_query_blocked_executor' do
with_busy_executor { run_async_load }

# In this case the wrapped task is executed as the 'fallback_action' by the thread pool executor,
# so we get the async span, even though it is not actually async.
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)

skip(<<~SKIP)
`async` _should_ be false here, but it's executed as a fallback action and
is incorrectly set to `true`.
Whether or not this is actually an issue is up for debate;
it's true that the query would have been async if the global pool load was lower,
so it could be said that the benefit of attempting to enqueue the task
is measured in degrees, ranging from no benefit to saving the entire time of the query.
However, the _other_ scenario in which the task is enqueued but not yet worked on
causes `async` to be false.
Ultimately, however, this is a bug in Rails's instrumentation around async queries,
so it doesn't feel particularly pressing to solve it here with a bunch of
otherwise unecessary patches.
SKIP

_(execute_query_span.attributes['async']).must_equal(false)
end

it 'async_query_slow_executor' do
# executor accepts task, but doesn't fulfill it before the waiter
with_busy_executor(block_queue: false) do
run_async_load
end

# When #to_a is called, the query is still pending and hasn't been picked up,
# so AR executes is synchronously. The executor task is cancelled (or should be?),
# so this span won't be here.
_(execute_query_span.attributes['async']).must_equal(false)
_(span_names).must_equal(['User query', 'schedule User query', 'test_wrapper'])
end

it 'async_query_no_wait' do
run_async_load do
User.all.load_async.to_a
end

# here we called #to_a inline, so it executed before the async scheduler
# could assign the task to a worker. I'm not sure this test will always pass.
_(execute_query_span.attributes['async']).must_equal(false)
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)
end

it 'async_count' do
run_async_load do
count = User.async_count
sleep(0.5)
count.value
end

count_span = spans.find { |span| span.name == 'User Count' }
_(count_span.attributes['async']).must_equal(true)
end

it 'works with concurrent queries' do
Account.create!

run_async_load do
users = User.all.load_async
accounts = Account.all.load_async

sleep(0.5)

users.to_a
accounts.to_a
end

user_schedule_span = spans.find { |span| span.name == 'schedule User query' }
account_schedule_span = spans.find { |span| span.name == 'schedule Account query' }
user_query_span = spans.find { |span| span.name == 'User query' }
account_query_span = spans.find { |span| span.name == 'Account query' }
test_wrapper_span = spans.find { |span| span.name == 'test_wrapper' }

_(user_schedule_span.parent_span_id).must_equal(test_wrapper_span.span_id)
_(account_schedule_span.parent_span_id).must_equal(test_wrapper_span.span_id)

_(user_query_span.parent_span_id).must_equal(user_schedule_span.span_id)
_(account_query_span.parent_span_id).must_equal(account_schedule_span.span_id)

_(user_query_span.attributes['async']).must_equal(true)
_(account_query_span.attributes['async']).must_equal(true)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
let(:spans) { exporter.finished_spans }

before { exporter.reset }

after do
ActiveRecord::Base.subclasses.each do |model|
model.connection.truncate(model.table_name)
Expand Down
14 changes: 14 additions & 0 deletions instrumentation/active_record/test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
ActiveRecord::Base.logger = logger
ActiveRecord::Migration.verbose = false

ActiveRecord.async_query_executor = :global_thread_pool

ActiveRecord::Base.establish_connection(
adapter: 'sqlite3',
database: 'db/development.sqlite3'
Expand Down Expand Up @@ -84,3 +86,15 @@ def change
end

Minitest.after_run { CreateUserTable.migrate(:down) }

# Used in async tests to determine what thread spawned which span
module SpanThreadIdTracking
def internal_start_span(name, kind, attributes, links, start_timestamp, parent_context, instrumentation_scope) # rubocop: disable Metrics/ParameterLists
attributes ||= {}
attributes['__test_only_thread_id'] = Thread.current.object_id

super
end
end

OpenTelemetry::SDK::Trace::TracerProvider.prepend(SpanThreadIdTracking)

0 comments on commit 8b1ccd9

Please sign in to comment.