From 75fecfe174af49cc31c288783592aa279b8e96b2 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 7 Oct 2024 12:39:33 -0700 Subject: [PATCH] add async callbacks test --- opentelemetry-sdk/src/metrics/mod.rs | 52 ++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 75f3b9cb46..4828b2d940 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1250,6 +1250,58 @@ mod tests { asynchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge"); } + async fn some_async_function() -> u64 { + std::thread::sleep(std::time::Duration::from_millis(10)); + 1 + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "current_thread")] + async fn async_inside_observable_callback_from_tokio_current_thread() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + async_inside_observable_callback_helper(); + } + + fn async_inside_observable_callback_helper() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + let mut test_context = TestContext::new(Temporality::Delta); + let _obs_gauge = test_context + .meter() + .u64_observable_gauge("my_observable_gauge") + .with_callback(|observer| { + // call async function from here + let value = futures_executor::block_on(some_async_function()); + observer.observe(value, &[]); + }) + .init(); + + test_context.flush_metrics(); + let last_value = + test_context.get_aggregation::>("my_observable_gauge", None); + assert_eq!(last_value.data_points.len(), 1); + let data_point = &last_value.data_points[0]; + assert_eq!(data_point.value, 1); + } + fn asynchronous_instruments_cumulative_with_gap_in_measurements_helper( instrument_name: &'static str, ) {