Skip to content

Commit

Permalink
Fix rolling back in run_transaction when user errors occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Dec 21, 2023
1 parent 18d251e commit 0a3bb24
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 11 deletions.
13 changes: 12 additions & 1 deletion src/db/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ impl<'a> FirestoreTransaction<'a> {
Ok(())
}

pub fn finish(&mut self) -> FirestoreResult<()> {
self.finished = true;
self.transaction_span.in_scope(|| {
debug!("Transaction has been finished locally without rolling back to be able to retry it again.");
});
Ok(())
}

pub fn is_empty(&self) -> bool {
self.writes.is_empty()
}
Expand Down Expand Up @@ -216,11 +224,13 @@ impl FirestoreDb {
warn!(%err, delay = ?retry_after, "Transient error occurred in transaction function. Retrying after the specified delay.");
});
initial_backoff_duration = retry_after;
transaction.finish().ok();
}
BackoffError::Permanent(err) => {
transaction.rollback().await.ok();
return Err(FirestoreError::ErrorInTransaction(
FirestoreErrorInTransaction::new(transaction_id.clone(), Box::new(err)),
))
));
}
},
}
Expand Down Expand Up @@ -258,6 +268,7 @@ impl FirestoreDb {
);

let ret_val = func(cdb, &mut transaction).await.map_err(|backoff_err| {
transaction.finish().ok();
match backoff_err {
BackoffError::Transient { err, retry_after } => {
transaction_span.in_scope(|| {
Expand Down
25 changes: 25 additions & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,28 @@ where
}
.boxed()
}

#[derive(Debug)]
pub struct CustomUserError {
details: String,
}

impl CustomUserError {
pub fn new(msg: &str) -> CustomUserError {
CustomUserError {
details: msg.to_string(),
}
}
}

impl std::fmt::Display for CustomUserError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.details)
}
}

impl std::error::Error for CustomUserError {
fn description(&self) -> &str {
&self.details
}
}
64 changes: 54 additions & 10 deletions tests/transaction-tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::common::setup;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

mod common;
use firestore::*;
Expand Down Expand Up @@ -54,17 +56,59 @@ async fn transaction_tests() -> Result<(), Box<dyn std::error::Error + Send + Sy
transaction.commit().await?;
}

let object_updated: MyTestStructure = db
.fluent()
.update()
.in_col(TEST_COLLECTION_NAME)
.precondition(FirestoreWritePrecondition::Exists(true))
.document_id(&my_struct.some_id)
.object(&my_struct.clone())
.execute()
.await?;
{
let transaction = db.begin_transaction().await?;
let db = db.clone_with_consistency_selector(FirestoreConsistencySelector::Transaction(
transaction.transaction_id.clone(),
));
let object_updated: MyTestStructure = db
.fluent()
.update()
.in_col(TEST_COLLECTION_NAME)
.precondition(FirestoreWritePrecondition::Exists(true))
.document_id(&my_struct.some_id)
.object(&my_struct.clone())
.execute()
.await?;
transaction.commit().await?;
assert_eq!(object_updated, my_struct);
}

assert_eq!(object_updated, my_struct);
// Handling permanent errors
{
let res: FirestoreResult<()> = db
.run_transaction(|_db, _tx| {
Box::pin(async move {
//Test returning an error
Err(backoff::Error::Permanent(common::CustomUserError::new(
"test error",
)))
})
})
.await;
assert!(res.is_err());
}

// Handling transient errors
{
let counter = Arc::new(AtomicUsize::new(1));
let res: FirestoreResult<()> = db
.run_transaction(|_db, _tx| {
let counter = counter.fetch_add(1, Ordering::Relaxed);
Box::pin(async move {
if counter > 2 {
return Ok(());
}
//Test returning an error
Err(backoff::Error::Transient {
err: common::CustomUserError::new("test error"),
retry_after: None,
})
})
})
.await;
assert!(res.is_ok());
}

Ok(())
}

0 comments on commit 0a3bb24

Please sign in to comment.