Skip to content

Commit

Permalink
Enable hyper client tests for SimpleLogProcessor integration tests (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Jan 23, 2025
1 parent 38be4d9 commit 927a08c
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 96 deletions.
212 changes: 118 additions & 94 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

use anyhow::Result;
use ctor::dtor;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::{logs as sdklogs, Resource};
use std::fs::File;
use std::io::Read;
use std::os::unix::fs::MetadataExt;
use std::time::Duration;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use uuid::Uuid;

fn init_logs(is_simple: bool) -> Result<sdklogs::LoggerProvider> {
let exporter_builder = LogExporter::builder();
Expand Down Expand Up @@ -43,25 +46,85 @@ fn init_logs(is_simple: bool) -> Result<sdklogs::LoggerProvider> {
Ok(logger_provider)
}

async fn logs_tokio_helper(is_simple: bool) -> Result<()> {
use crate::{assert_logs_results_contains, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs(is_simple).unwrap();
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

fn logs_non_tokio_helper(is_simple: bool, init_logs_inside_rt: bool) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;
let logger_provider = if init_logs_inside_rt {
// Initialize the logger provider inside the Tokio runtime
rt.block_on(async {
// Setup the collector container inside Tokio runtime
test_utils::start_collector_container().await?;
init_logs(is_simple)
})?
} else {
// Initialize the logger provider outside the Tokio runtime
rt.block_on(async {
let _ = test_utils::start_collector_container().await;
});
init_logs(is_simple)?
};

let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);

// Generate a random UUID and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(
target: "my-target",
uuid = expected_uuid,
"hello from {}. My price is {}.",
"banana",
2.99
);
}

let _ = logger_provider.shutdown();
std::thread::sleep(Duration::from_secs(5));
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> {
let file = File::open(result)?;
let mut contents = String::new();
let mut reader = std::io::BufReader::new(&file);
reader.read_to_string(&mut contents)?;
assert!(contents.contains(expected_content));
Ok(())
}

#[cfg(test)]
mod logtests {
// TODO: The tests in this mod works like below: Emit a log with a UUID,
// The tests in this mod works like below: Emit a log with a UUID,
// then read the logs from the file and check if the UUID is present in the
// logs. This makes it easy to validate with a single collector and its
// output. This is a very simple test but good enough to validate that OTLP
// Exporter did work! A more comprehensive test would be to validate the
// entire Payload. The infra for it already exists (logs_asserter.rs), the
// TODO here is to write a test that validates the entire payload.
// Exporter did work!

use super::*;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::{fs::File, time::Duration};
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use uuid::Uuid;
use std::fs::File;

#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
Expand All @@ -87,6 +150,8 @@ mod logtests {
Ok(())
}

// Batch Processor

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_multi_thread() -> Result<()> {
Expand All @@ -105,104 +170,63 @@ mod logtests {
logs_tokio_helper(false).await
}

async fn logs_tokio_helper(is_simple: bool) -> Result<()> {
use crate::{assert_logs_results_contains, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs(is_simple).unwrap();
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main_init_logs_inside_rt() -> Result<()> {
logs_non_tokio_helper(false, true)
}

#[test]
#[cfg(feature = "reqwest-blocking-client")]
pub fn logs_batch_non_tokio_main_with_init_logs_outside_rt() -> Result<()> {
logs_non_tokio_helper(false, false)
}

// Simple Processor

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_simple_non_tokio_main_with_init_logs_inside_rt() -> Result<()> {
logs_non_tokio_helper(true, true)
}

#[test]
#[cfg(any(feature = "reqwest-blocking-client"))]
pub fn logs_simple_non_tokio_main_with_init_logs_outsie_rt() -> Result<()> {
logs_non_tokio_helper(true, false)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
#[cfg(any(
feature = "tonic-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
pub async fn logs_simple_tokio_multi_thread() -> Result<()> {
logs_tokio_helper(true).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
#[cfg(any(
feature = "tonic-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> {
logs_tokio_helper(true).await
}

// Ignored, to be investigated
#[ignore]
#[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/2539
#[tokio::test(flavor = "current_thread")]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
#[cfg(any(
feature = "tonic-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
pub async fn logs_simple_tokio_current() -> Result<()> {
logs_tokio_helper(true).await
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main() -> Result<()> {
logs_non_tokio_helper(false)
}

fn logs_non_tokio_helper(is_simple: bool) -> Result<()> {
// Initialize the logger provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
// created by BatchLogProcessor.
let rt = tokio::runtime::Runtime::new()?;
let logger_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
test_utils::start_collector_container().await?;
init_logs(is_simple)
})?;
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
std::thread::sleep(Duration::from_secs(5));
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_simple_non_tokio_main() -> Result<()> {
logs_non_tokio_helper(true)
}
}

pub fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> {
let file = File::open(result)?;
let mut contents = String::new();
let mut reader = std::io::BufReader::new(&file);
reader.read_to_string(&mut contents)?;
assert!(contents.contains(expected_content));
Ok(())
}

pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> {
let left = read_logs_from_json(File::open(expected)?)?;
let right = read_logs_from_json(File::open(result)?)?;

LogsAsserter::new(left, right).assert();

assert!(File::open(result).unwrap().metadata().unwrap().size() > 0);
Ok(())
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
3 changes: 1 addition & 2 deletions scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ if [ -d "$TEST_DIR" ]; then
echo "Integration Tests: Hyper Client (Disabled now)"
echo ####
echo
# TODO: hyper client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "hyper-client","internal-logs"
cargo test --no-default-features --features "hyper-client","internal-logs" --test logs
else
echo "Directory $TEST_DIR does not exist. Skipping tests."
exit 1
Expand Down

0 comments on commit 927a08c

Please sign in to comment.