Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify loggerprovider shutdown, flush to return single Result and handle repeat shutdown calls #1750

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
- **Breaking** [#1729](https://github.com/open-telemetry/opentelemetry-rust/pull/1729)
- Update the return type of `TracerProvider.span_processors()` from `&Vec<Box<dyn SpanProcessor>>` to `&[Box<dyn SpanProcessor>]`.
- Update the return type of `LoggerProvider.log_processors()` from `&Vec<Box<dyn LogProcessor>>` to `&[Box<dyn LogProcessor>]`.
- **Breaking** [#1750](https://github.com/open-telemetry/opentelemetry-rust/pull/1729)
- Update the return type of `LoggerProvider.shutdown()` from `Vec<LogResult<()>>` to `LogResult<()>`.
- Update the return type of `LoggerProvider.force_flush()` from `Vec<LogResult<()>>` to `LogResult<()>`.

## v0.22.1

Expand Down
83 changes: 63 additions & 20 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ use crate::{
runtime::RuntimeChannel,
};
use opentelemetry::{
global::{self},
logs::LogResult,
global,
logs::{LogError, LogResult},
trace::TraceContextExt,
Context, InstrumentationLibrary,
};

#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;

use std::{borrow::Cow, sync::Arc};
use std::{
borrow::Cow,
sync::{atomic::Ordering, Arc},
};
use std::{sync::atomic::AtomicBool, time::SystemTime};

use once_cell::sync::Lazy;
Expand Down Expand Up @@ -97,25 +100,46 @@ impl LoggerProvider {
}

/// Force flush all remaining logs in log processors and return results.
pub fn force_flush(&self) -> Vec<LogResult<()>> {
self.log_processors()
.iter()
.map(|processor| processor.force_flush())
.collect()
pub fn force_flush(&self) -> LogResult<()> {
// propagate force_flush to processors
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.force_flush() {
errs.push(err);
}
}

if errs.is_empty() {
Ok(())
} else {
Err(LogError::Other(format!("{errs:?}").into()))
}
}

/// Shuts down this `LoggerProvider`
pub fn shutdown(&self) -> Vec<LogResult<()>> {
// mark itself as already shutdown
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
// propagate the shutdown signal to processors
// it's up to the processor to properly block new logs after shutdown
self.inner
.processors
.iter()
.map(|processor| processor.shutdown())
.collect()
pub fn shutdown(&self) -> LogResult<()> {
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// propagate the shutdown signal to processors
// it's up to the processor to properly block new logs after shutdown
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
errs.push(err);
}
}

if errs.is_empty() {
Ok(())
} else {
Err(LogError::Other(format!("{errs:?}").into()))
}
} else {
Err(LogError::Other("logger provider already shut down".into()))
Copy link
Contributor

@TommyCpp TommyCpp May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make a dedicate LogError variant for repeat shutdown. I image in most use cases users don't care as long as the shutdown has invoked at least once

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree to fix it. Not in this PR, as the current structure of placing these errors in the API is not correct, and needs a refactoring. Will cover it as part of #1042

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mark this as releasing blocking and add a todo in code too. It will be very anti-intutive if the users has to parse the error string to understand if the errors is caused by repeated shutdown

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mark this as releasing blocking

yes for stable or even beta release. But not for #1738 which is still alpha for logs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this is already answered. What is wrong with returning vec of LogResult for shutdown? Is it only for consistency with metrics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. And also, if any individual sub-component faced issue while shutdown, they should be emitting their own logging. Shutdown() just need to capture the overall state. But I am not very sure we are doing it correctly. The proper solution should be coming once we address #1146 (that also is very much related to #761 as it is hard to solve internal logging without fixing the infinite telemetry/circular dep problem)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks, that makes sense.

}
}
}

Expand Down Expand Up @@ -485,6 +509,25 @@ mod tests {
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3);
}

#[test]
fn shutdown_idempotent_test() {
let counter = Arc::new(AtomicU64::new(0));
let logger_provider = LoggerProvider::builder()
.with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
.build();

let shutdown_res = logger_provider.shutdown();
assert!(shutdown_res.is_ok());

// Subsequent shutdowns should return an error.
let shutdown_res = logger_provider.shutdown();
assert!(shutdown_res.is_err());

// Subsequent shutdowns should return an error.
let shutdown_res = logger_provider.shutdown();
assert!(shutdown_res.is_err());
}

#[test]
fn global_shutdown_test() {
// cargo test shutdown_test --features=logs
Expand All @@ -508,7 +551,7 @@ mod tests {

// explicitly calling shutdown on logger_provider. This will
// indeed do the shutdown, even if there are loggers still alive.
logger_provider.shutdown();
let _ = logger_provider.shutdown();

// Assert

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
meter_provider.shutdown()?;

#[cfg(feature = "logs")]
drop(logger_provider);
logger_provider.shutdown()?;

Ok(())
}
Loading