Skip to content

Commit

Permalink
[#460] Use mpsc channel for file logger
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Oct 15, 2024
1 parent 5df7d29 commit fa4a525
Showing 1 changed file with 39 additions and 66 deletions.
105 changes: 39 additions & 66 deletions iceoryx2-bb/log/src/logger/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,23 @@
// iceoryx2_pal_posix platform. In this case, the logger needs to use the low-level calls directly
// to avoid a circular dependency with iceoryx2_bb_posix.
use std::{
collections::VecDeque,
fmt::Debug,
fs::OpenOptions,
io::Write,
sync::{Arc, Condvar, Mutex},
sync::{mpsc::Sender, Arc},
thread::JoinHandle,
time::{Duration, Instant, SystemTime},
};

use std::sync::mpsc::channel;

use crate::{get_log_level, LogLevel};

enum Message {
Entry(Entry),
Stop,
}

struct Entry {
timestamp: Duration,
elapsed_time: Duration,
Expand All @@ -61,78 +67,43 @@ impl Debug for Entry {
}
}

struct State {
buffer: VecDeque<Entry>,
keep_running: bool,
}

impl Default for State {
fn default() -> Self {
Self {
buffer: VecDeque::new(),
keep_running: true,
}
}
}

/// A logger that logs all messages into a file. It implements an active object pattern. A
/// background thread waits on a queue of log messages and whenever a new message is added.
pub struct Logger {
state: Arc<Mutex<State>>,
trigger: Arc<Condvar>,
sender: Arc<Sender<Message>>,
start_time: Instant,
_background_thread: Arc<Option<JoinHandle<()>>>,
}

impl Logger {
/// Creates a new file logger.
pub fn new(file_name: &str) -> Self {
let state = Arc::new(Mutex::new(State::default()));
let trigger = Arc::new(Condvar::new());
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(file_name)
.expect("Open log file for writing.");

let self_state = state.clone();
let self_trigger = trigger.clone();
let (sender, receiver) = channel();

let write_buffer_to_file = move || loop {
let lock = state.lock().expect("Acquire internal state mutex.");

let lock = trigger
.wait_while(lock, |state: &mut State| {
state.buffer.is_empty() || !state.keep_running
})
.expect("Wait for internal state trigger.");
drop(lock);

loop {
// acquiring the lock only to get the next buffer to minimize contention as much
// as possible.
let mut lock = state.lock().expect("Acquire internal state mutex.");
let buffer = lock.buffer.pop_front();
drop(lock);

match buffer {
Some(entry) => file
.write_all(format!("{:?}\n", entry).as_bytes())
.expect("Writing log message into log file."),
None => break,
}
}
match receiver.recv() {
Ok(Message::Entry(entry)) => file
.write_all(format!("{:?}\n", entry).as_bytes())
.expect("Writing log message into log file."),
Ok(Message::Stop) => break,
Err(e) => file
.write_all(
format!("[This should never happen!] File Logger got error: {:?}", e)
.as_bytes(),
)
.expect("Write log message into log file."),
};
file.sync_all().expect("Sync log file with disc.");

let lock = state.lock().expect("Acquire internal state mutex.");
if !lock.keep_running {
break;
}
};

Self {
state: self_state,
trigger: self_trigger,
sender: Arc::new(sender),
_background_thread: Arc::new(Some(std::thread::spawn(write_buffer_to_file))),
start_time: Instant::now(),
}
Expand All @@ -141,9 +112,9 @@ impl Logger {

impl Drop for Logger {
fn drop(&mut self) {
let mut state = self.state.lock().expect("Acquire internal state mutex.");
state.keep_running = false;
self.trigger.notify_one();
self.sender
.send(Message::Stop)
.expect("Send stop notification to background thread.");
}
}

Expand All @@ -158,16 +129,18 @@ impl crate::Logger for Logger {
return;
}

let mut state = self.state.lock().expect("Acquire internal state mutex.");
state.buffer.push_back(Entry {
log_level,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Acquire current system time."),
elapsed_time: self.start_time.elapsed(),
origin: origin.to_string(),
message: formatted_message.to_string(),
});
self.trigger.notify_one();
self.sender
.send({
Message::Entry(Entry {
log_level,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Acquire current system time."),
elapsed_time: self.start_time.elapsed(),
origin: origin.to_string(),
message: formatted_message.to_string(),
})
})
.expect("Send log message to log thread.");
}
}

0 comments on commit fa4a525

Please sign in to comment.