Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handling of shutdown in BatchLogProcessor #2581

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 139 additions & 95 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
export_log_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,
Expand All @@ -292,87 +291,112 @@

impl LogProcessor for BatchLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
otel_warn!(
name: "BatchLogProcessor.Emit.ProcessorShutdown",
message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
);
return;
}

let result = self
.logs_sender
.try_send(Box::new((record.clone(), instrumentation.clone())));

if result.is_err() {
// Increment dropped logs count. The first time we have to drop a log,
// emit a warning.
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
}
return;
}

// At this point, sending the log record to the data channel was successful.
// Increment the current batch size and check if it has reached the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
{
// Check if the a control message for exporting logs is already sent to the worker thread.
// If not, send a control message to export logs.
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_log_message_sent.load(Ordering::Relaxed) {
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but it's more verbose than swap.
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportLog(
self.export_log_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error
// If the control message could not be sent, reset the `export_log_message_sent` flag.
self.export_log_message_sent.store(false, Ordering::Relaxed);
// match for result and handle each separately
match result {
Ok(_) => {
// Successfully sent the log record to the data channel.
// Increment the current batch size and check if it has reached
// the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
>= self.max_export_batch_size
{
// Check if the a control message for exporting logs is
// already sent to the worker thread. If not, send a control
// message to export logs. `export_log_message_sent` is set
// to false ONLY when the worker thread has processed the
// control message.

if !self.export_log_message_sent.load(Ordering::Relaxed) {

Check warning on line 313 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L313

Added line #L313 was not covered by tests
// This is a cost-efficient check as atomic load
// operations do not require exclusive access to cache
// line. Perform atomic swap to
// `export_log_message_sent` ONLY when the atomic load
// operation above returns false. Atomic
// swap/compare_exchange operations require exclusive
// access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but
// it's more verbose than swap.
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportLog(
self.export_log_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error If the control message
// could not be sent, reset the
// `export_log_message_sent` flag.
self.export_log_message_sent.store(false, Ordering::Relaxed);
}

Check warning on line 335 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L323-L335

Added lines #L323 - L335 were not covered by tests
}
}
}
}
}
Err(mpsc::TrySendError::Full(_)) => {
// Increment dropped logs count. The first time we have to drop
// a log, emit a warning.
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
}

Check warning on line 347 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L344-L347

Added lines #L344 - L347 were not covered by tests
}
Err(mpsc::TrySendError::Disconnected(_)) => {
// Given background thread is the only receiver, and it's
// disconnected, it indicates the thread is shutdown
otel_warn!(
name: "BatchLogProcessor.Emit.AfterShutdown",
message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
);
}
}
}

fn force_flush(&self) -> LogResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return LogResult::Err(LogError::Other(
"BatchLogProcessor is already shutdown".into(),
));
}
let (sender, receiver) = mpsc::sync_channel(1);
self.message_sender
match self
.message_sender
.try_send(BatchMessage::ForceFlush(sender))
.map_err(|err| LogError::Other(err.into()))?;

receiver
.recv_timeout(self.forceflush_timeout)
.map_err(|err| {
if err == RecvTimeoutError::Timeout {
LogError::ExportTimedOut(self.forceflush_timeout)
} else {
LogError::Other(err.into())
}
})?
{
Ok(_) => receiver
.recv_timeout(self.forceflush_timeout)
.map_err(|err| {
if err == RecvTimeoutError::Timeout {
LogError::ExportTimedOut(self.forceflush_timeout)

Check warning on line 370 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L369-L370

Added lines #L369 - L370 were not covered by tests
} else {
LogError::Other(err.into())

Check warning on line 372 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L372

Added line #L372 was not covered by tests
}
})?,
Err(mpsc::TrySendError::Full(_)) => {
// If the control message could not be sent, emit a warning.
otel_debug!(
name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
Copy link
Member

@lalitb lalitb Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think can also happen if user repeatedly calls the shutdown from different threads, and the first shutdown message is not yet processed, so channel is still intact :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a good point. Let me modify error message to include that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in separate PR to keep this merged asap.

);
LogResult::Err(LogError::Other("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))

Check warning on line 381 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L377-L381

Added lines #L377 - L381 were not covered by tests
}
Err(mpsc::TrySendError::Disconnected(_)) => {
// Given background thread is the only receiver, and it's
// disconnected, it indicates the thread is shutdown
otel_debug!(
name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
);

Check warning on line 389 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L386-L389

Added lines #L386 - L389 were not covered by tests

LogResult::Err(LogError::Other(
"ForceFlush cannot be performed as BatchLogProcessor is already shutdown"
.into(),
))

Check warning on line 394 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L391-L394

Added lines #L391 - L394 were not covered by tests
}
}
}

fn shutdown(&self) -> LogResult<()> {
// Set is_shutdown to true
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);

let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
Expand All @@ -385,35 +409,56 @@
}

let (sender, receiver) = mpsc::sync_channel(1);
self.message_sender
.try_send(BatchMessage::Shutdown(sender))
.map_err(|err| LogError::Other(err.into()))?;

receiver
.recv_timeout(self.shutdown_timeout)
.map(|_| {
// join the background thread after receiving back the shutdown signal
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.join().unwrap();
}
LogResult::Ok(())
})
.map_err(|err| match err {
RecvTimeoutError::Timeout => {
otel_error!(
name: "BatchLogProcessor.Shutdown.Timeout",
message = "BatchLogProcessor shutdown timing out."
);
LogError::ExportTimedOut(self.shutdown_timeout)
}
_ => {
otel_error!(
name: "BatchLogProcessor.Shutdown.Error",
error = format!("{}", err)
);
LogError::Other(err.into())
}
})?
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
Ok(_) => {
receiver
.recv_timeout(self.shutdown_timeout)
.map(|_| {
// join the background thread after receiving back the
// shutdown signal
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.join().unwrap();
}
LogResult::Ok(())
})
.map_err(|err| match err {
RecvTimeoutError::Timeout => {
otel_error!(
name: "BatchLogProcessor.Shutdown.Timeout",
message = "BatchLogProcessor shutdown timing out."
);
LogError::ExportTimedOut(self.shutdown_timeout)

Check warning on line 430 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L426-L430

Added lines #L426 - L430 were not covered by tests
}
_ => {
otel_error!(

Check warning on line 433 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L433

Added line #L433 was not covered by tests
name: "BatchLogProcessor.Shutdown.Error",
error = format!("{}", err)

Check warning on line 435 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L435

Added line #L435 was not covered by tests
);
LogError::Other(err.into())

Check warning on line 437 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L437

Added line #L437 was not covered by tests
}
})?
}
Err(mpsc::TrySendError::Full(_)) => {
// If the control message could not be sent, emit a warning.
otel_debug!(
name: "BatchLogProcessor.Shutdown.ControlChannelFull",
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
);
LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))

Check warning on line 447 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L443-L447

Added lines #L443 - L447 were not covered by tests
}
Err(mpsc::TrySendError::Disconnected(_)) => {
// Given background thread is the only receiver, and it's
// disconnected, it indicates the thread is shutdown
otel_debug!(
name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
);

Check warning on line 455 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L452-L455

Added lines #L452 - L455 were not covered by tests

LogResult::Err(LogError::Other(
"BatchLogProcessor is already shutdown".into(),
))

Check warning on line 459 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L457-L459

Added lines #L457 - L459 were not covered by tests
}
}
}

fn set_resource(&self, resource: &Resource) {
Expand Down Expand Up @@ -590,7 +635,6 @@
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
export_log_message_sent: Arc::new(AtomicBool::new(false)),
Expand Down