-
Notifications
You must be signed in to change notification settings - Fork 182
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: track ActiveRecord async queries
- Loading branch information
Showing
6 changed files
with
303 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
54 changes: 54 additions & 0 deletions
54
...ib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# frozen_string_literal: true | ||
|
||
# Copyright The OpenTelemetry Authors | ||
# | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
module OpenTelemetry | ||
module Instrumentation | ||
module ActiveRecord | ||
QUERY_SPAN_NAME_KEY = OpenTelemetry::Context.create_key('async_query_span_name') | ||
|
||
module Patches | ||
# Module to prepend to ActiveRecord::ConnectionAdapters::ConnectionPool | ||
module AsyncQueryContextPropagation | ||
def schedule_query(future_result) # :nodoc: | ||
context = OpenTelemetry::Context.current | ||
|
||
@async_executor.post do | ||
# This can happen in the request thread, in the case of a busy executor (fallback_action is executed.) | ||
# FIXME: This override should be unecessary if the concurrent-ruby instrumentation is always installed. | ||
OpenTelemetry::Context.with_current(context) do | ||
future_result.execute_or_skip | ||
end | ||
end | ||
|
||
Thread.pass | ||
end | ||
end | ||
|
||
# Module to support otel context propagation to ActiveRecord::FutureResults | ||
module FutureResultExtensions | ||
OTEL_QUERY_SPAN_NAME_IVAR = :@__otel_query_span_name | ||
|
||
def initialize(...) | ||
super | ||
|
||
if (query_span_name = OpenTelemetry::Context.current.value(QUERY_SPAN_NAME_KEY)) | ||
instance_variable_set(OTEL_QUERY_SPAN_NAME_IVAR, query_span_name) | ||
end | ||
end | ||
|
||
private | ||
|
||
def execute_query(connection, async: false) | ||
name = instance_variable_get(OTEL_QUERY_SPAN_NAME_IVAR) || @args[1] || 'execute_query' | ||
Instrumentation.instance.tracer.in_span(name, attributes: { 'async' => async }) do | ||
super | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
223 changes: 223 additions & 0 deletions
223
...record/test/instrumentation/active_record/patches/async_query_context_propagation_test.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
# frozen_string_literal: true | ||
|
||
# Copyright The OpenTelemetry Authors | ||
# | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
require 'test_helper' | ||
|
||
require_relative '../../../../lib/opentelemetry/instrumentation/active_record' | ||
require_relative '../../../../lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation' | ||
|
||
ASYNC_TEST_LOGGER = Logger.new($stdout).tap { |logger| logger.level = Logger::WARN } | ||
|
||
describe OpenTelemetry::Instrumentation::ActiveRecord::Patches::AsyncQueryContextPropagation do | ||
let(:exporter) { EXPORTER } | ||
let(:unfiltered_spans) { exporter.finished_spans } | ||
let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveRecord::Instrumentation.instance } | ||
let(:logger) { ASYNC_TEST_LOGGER } | ||
|
||
before do | ||
exporter.reset | ||
setup_asynchronous_queries_session | ||
User.create! | ||
end | ||
|
||
after do | ||
teardown_asynchronous_queries_session | ||
|
||
ActiveRecord::Base.subclasses.each do |model| | ||
model.connection.truncate(model.table_name) | ||
end | ||
end | ||
|
||
def setup_asynchronous_queries_session | ||
@_async_queries_session = ActiveRecord::Base.asynchronous_queries_tracker.start_session | ||
end | ||
|
||
def teardown_asynchronous_queries_session | ||
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session(true) if @_async_queries_session | ||
end | ||
|
||
def run_async_load | ||
logger.debug('>>> run async load') | ||
in_new_trace do | ||
instrumentation.tracer.in_span('test_wrapper') do | ||
if block_given? | ||
yield | ||
else | ||
users = User.all.load_async | ||
sleep(0.5) | ||
logger.debug('>>> async #to_a') | ||
users.to_a | ||
end | ||
end | ||
end | ||
end | ||
|
||
def in_new_trace(&block) | ||
OpenTelemetry::Context.with_current(OpenTelemetry::Context::ROOT, &block) | ||
end | ||
|
||
def spans | ||
test_wrapper_span = unfiltered_spans.find { |span| span.name == 'test_wrapper' } | ||
unfiltered_spans.select { |span| span.trace_id == test_wrapper_span.trace_id } | ||
end | ||
|
||
def span_names | ||
spans.map(&:name).sort | ||
end | ||
|
||
# call with block_queue: true to completely block the executor (no tasks can be enqueued), | ||
# or with block_queue: false to block the workers only (tasks still accepted in the queue) | ||
def with_busy_executor(block_queue: true) | ||
_(ActiveRecord.async_query_executor).must_equal :global_thread_pool | ||
|
||
mutex = Mutex.new | ||
condvar = ConditionVariable.new | ||
executor = ActiveRecord.instance_variable_get(:@global_thread_pool_async_query_executor) | ||
|
||
task_count = executor.max_length | ||
task_count += executor.max_queue if block_queue | ||
|
||
awaiting_signals = (0...task_count).to_a | ||
|
||
# Fill up the max thread count and queue with tasks that | ||
# will never complete until they are signaled to do so. | ||
task_count.times do |n| | ||
executor.post do | ||
mutex.synchronize do | ||
ASYNC_TEST_LOGGER.debug("task #{n} waiting...") | ||
condvar.wait(mutex) | ||
ASYNC_TEST_LOGGER.debug("task #{n} got the signal") | ||
awaiting_signals.delete(n) | ||
end | ||
end | ||
end | ||
|
||
logger.debug("yielding... block_queue=#{block_queue}") | ||
yield | ||
logger.debug('...done!') | ||
ensure | ||
logger.debug('cleaning up...') | ||
# clean up the queue | ||
mutex.synchronize { condvar.signal } until awaiting_signals.empty? | ||
end | ||
|
||
def current_thread_id | ||
Thread.current.object_id | ||
end | ||
|
||
def execute_query_span | ||
spans.find { |span| span.name == 'User query' } | ||
end | ||
|
||
it 'async_query' do | ||
run_async_load | ||
|
||
_(span_names).must_equal(['test_wrapper', 'User query', 'schedule User query'].sort) | ||
_(execute_query_span.attributes['__test_only_thread_id']).wont_equal(current_thread_id) | ||
_(execute_query_span.attributes['async']).must_equal(true) | ||
end | ||
|
||
describe 'no executor' do | ||
before do | ||
@async_query_executor_was = ActiveRecord.async_query_executor | ||
ActiveRecord.async_query_executor = nil | ||
end | ||
|
||
after do | ||
ActiveRecord.async_query_executor = @async_query_executor_was | ||
end | ||
|
||
it 'is not actually async' do | ||
run_async_load # sic | ||
|
||
_(spans.map(&:name)).wont_include('Schedule User query') | ||
_(spans.map(&:name)).must_include('User query') | ||
|
||
user_query = spans.find { |span| span.name == 'User query' } | ||
_(user_query.attributes['async']).must_equal(false) if user_query.attributes.key?('async') | ||
_(span_names).must_equal(['User query', 'test_wrapper'].sort) | ||
end | ||
end | ||
|
||
it 'async_query_blocked_executor' do | ||
with_busy_executor { run_async_load } | ||
|
||
# In this case the wrapped task is executed as the 'fallback_action' by the thread pool executor, | ||
# so we get the async span, even though it is not actually async. | ||
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id) | ||
|
||
skip(<<~SKIP) | ||
FIXME: async _should_ be false here, but it's executed as a fallback action and | ||
is incorrectly set to `true` | ||
SKIP | ||
|
||
_(execute_query_span.attributes['async']).must_equal(false) | ||
end | ||
|
||
it 'async_query_slow_executor' do | ||
# executor accepts task, but doesn't fulfill it before the waiter | ||
with_busy_executor(block_queue: false) do | ||
run_async_load | ||
end | ||
|
||
# When #to_a is called, the query is still pending and hasn't been picked up, | ||
# so AR executes is synchronously. The executor task is cancelled (or should be?), | ||
# so this span won't be here. | ||
_(execute_query_span.attributes['async']).must_equal(false) | ||
_(span_names).must_equal(['User query', 'schedule User query', 'test_wrapper']) | ||
end | ||
|
||
it 'async_query_no_wait' do | ||
run_async_load do | ||
User.all.load_async.to_a | ||
end | ||
|
||
# here we called #to_a inline, so it executed before the async scheduler | ||
# could assign the task to a worker. I'm not sure this test will always pass. | ||
_(execute_query_span.attributes['async']).must_equal(false) | ||
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id) | ||
end | ||
|
||
it 'async_count' do | ||
run_async_load do | ||
count = User.async_count | ||
sleep(0.5) | ||
count.value | ||
end | ||
|
||
count_span = spans.find { |span| span.name == 'User Count' } | ||
_(count_span.attributes['async']).must_equal(true) | ||
end | ||
|
||
it 'works with concurrent queries' do | ||
Account.create! | ||
|
||
run_async_load do | ||
users = User.all.load_async | ||
accounts = Account.all.load_async | ||
|
||
sleep(0.5) | ||
|
||
users.to_a | ||
accounts.to_a | ||
end | ||
|
||
user_schedule_span = spans.find { |span| span.name == 'schedule User query' } | ||
account_schedule_span = spans.find { |span| span.name == 'schedule Account query' } | ||
user_query_span = spans.find { |span| span.name == 'User query' } | ||
account_query_span = spans.find { |span| span.name == 'Account query' } | ||
test_wrapper_span = spans.find { |span| span.name == 'test_wrapper' } | ||
|
||
_(user_schedule_span.parent_span_id).must_equal(test_wrapper_span.span_id) | ||
_(account_schedule_span.parent_span_id).must_equal(test_wrapper_span.span_id) | ||
|
||
_(user_query_span.parent_span_id).must_equal(user_schedule_span.span_id) | ||
_(account_query_span.parent_span_id).must_equal(account_schedule_span.span_id) | ||
|
||
_(user_query_span.attributes['async']).must_equal(true) | ||
_(account_query_span.attributes['async']).must_equal(true) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters