Skip to content

Commit

Permalink
app_persistency: small async refactor
Browse files Browse the repository at this point in the history
The 'sync_all()' call on Drop of the AppPersistency camouflages the
error messages in case of a critical error. This makes debugging more
hard. By making the 'sync_all()' call synchronous, the problem is
solved. This has minimal to no drawbacks since the filesystem calls inside the
function do not benefit rusts async scheduling.
  • Loading branch information
svenrademakers committed Jul 10, 2024
1 parent a44ff68 commit c071311
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 30 deletions.
40 changes: 15 additions & 25 deletions src/persistency/app_persistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use std::time::Duration;
use super::binary_persistency::PersistencyStore;
use anyhow::Context;
use futures::future::Either;
use tokio::fs::{File, OpenOptions};
use tokio::runtime::Handle;
use tokio::fs::OpenOptions;
use tokio::time::sleep_until;
const BIN_DATA: &str = "/var/lib/bmcd/bmcd.bin";

Expand Down Expand Up @@ -79,20 +78,19 @@ struct MonitorContext {
}

impl MonitorContext {
pub async fn commit_to_file(&self) -> anyhow::Result<MonitorEvent> {
pub fn commit_to_file(&self) -> anyhow::Result<MonitorEvent> {
tracing::debug!("commiting persistency to disk");
let mut new = self.file.clone();
new.set_extension("new");

let pending = OpenOptions::new()
let pending = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&new)
.await?;
self.inner.write(pending.into_std().await).await?;
.open(&new)?;
self.inner.write(pending)?;

tokio::fs::rename(&new, &self.file).await.with_context(|| {
std::fs::rename(&new, &self.file).with_context(|| {
format!(
"error writing persistency binary. backup available at: {}",
new.to_string_lossy()
Expand All @@ -102,11 +100,11 @@ impl MonitorContext {
Ok(MonitorEvent::PersistencyWritten)
}

pub async fn sync_all(&self) -> anyhow::Result<()> {
pub fn sync_all(&self) -> anyhow::Result<()> {
if self.inner.is_dirty() {
self.commit_to_file().await?;
let file = File::open(&self.file).await?;
file.sync_all().await?;
self.commit_to_file()?;
let file = std::fs::File::open(&self.file)?;
file.sync_all()?;
tracing::info!("persistency synced");
}
Ok(())
Expand Down Expand Up @@ -210,7 +208,7 @@ impl ApplicationPersistency {
if !write_timeout.is_zero() {
sleep_until(new_deadline).await;
}
clone.commit_to_file().await
clone.commit_to_file()
})
}
};
Expand All @@ -230,16 +228,9 @@ impl Deref for ApplicationPersistency {

impl Drop for ApplicationPersistency {
fn drop(&mut self) {
let context = self.context.clone();

// block_on is used to make sure that the async code gets executed
// in place, preventing omission of the task during a tokio shutdown.
// Which would happen if it was scheduled with `tokio::spawn`.
Handle::current().block_on(async move {
if let Err(e) = context.sync_all().await {
tracing::error!("{}", e);
}
});
if let Err(e) = self.context.sync_all() {
tracing::error!("{}", e);
}
}
}

Expand Down Expand Up @@ -291,8 +282,7 @@ mod tests {
.unwrap();

persistency
.write(File::create(&bin_file).await.unwrap().into_std().await)
.await
.write(std::fs::File::create(&bin_file).unwrap())
.unwrap();

for n in 0..6u128 {
Expand Down
10 changes: 5 additions & 5 deletions src/persistency/binary_persistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ impl<'a> PersistencyStore {
watcher
}

pub(super) async fn write(
pub(super) fn write(
&self,
mut source: impl Write + Seek + 'a,
) -> Result<(), PersistencyError<'a>> {
let cache = self.cache.read().await;
let cache = self.cache.blocking_read();
let data = bincode::serialize(&cache.deref().0)
.map_err(|e| PersistencyError::SerializationError("data serialization".into(), e))?;

Expand Down Expand Up @@ -284,8 +284,8 @@ mod tests {
));
}

#[tokio::test]
async fn test_write_data() {
#[test]
fn test_write_data() {
let mut data = HashMap::<u64, Vec<u8>>::new();
let mut header = PersistencyHeader::new().unwrap();
data.insert(default_hash("test"), bincode::serialize(&222u128).unwrap());
Expand All @@ -302,7 +302,7 @@ mod tests {

let buffer = [0u8; 123];
let mut write_cursor = Cursor::new(buffer);
store.write(&mut write_cursor).await.unwrap();
store.write(&mut write_cursor).unwrap();
assert_eq!(
cursor.get_ref()[..cursor.position() as usize],
write_cursor.get_ref()[..write_cursor.position() as usize]
Expand Down

0 comments on commit c071311

Please sign in to comment.