diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index f1c597d8427..8ac051e357a 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -994,6 +994,8 @@ where #[allow(non_snake_case)] #[allow(clippy::cast_possible_truncation)] mod tests { + use std::collections::HashMap; + use super::*; use crate::database::database_description::on_chain::OnChain; use fuel_core_storage::{ @@ -1558,6 +1560,110 @@ mod tests { .is_migration_in_progress()); } + #[tokio::test] + async fn historical_rocksdb_migration_and_rollbacks_no_lost_updates() { + // Given + let rocks_db = RocksDb::>::default_open_temp(None).unwrap(); + + let historical_rocks_db = + InnerHistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange) + .unwrap(); + + // Commit 1000 blocks + for i in 1..=1000u32 { + let mut transaction = historical_rocks_db.read_transaction(); + transaction + .storage_as_mut::() + .insert(&key(), &(123 + i as u64)) + .unwrap(); + historical_rocks_db + .commit_changes(Some(i.into()), transaction.into_changes()) + .unwrap(); + } + + let mut revert_migration_transaction = StorageTransaction::transaction( + &historical_rocks_db.db, + ConflictPolicy::Overwrite, + Changes::default(), + ); + + // Revert the modification history from v2 to v1 + historical_rocks_db + .db + .iter_all::>(None) + .map(Result::unwrap) + .for_each(|(height, changes)| { + revert_migration_transaction + .storage_as_mut::>() + .insert(&height, &changes) + .unwrap(); + revert_migration_transaction + .storage_as_mut::>() + .remove(&height) + .unwrap(); + }); + + historical_rocks_db + .db + .commit_changes(&revert_migration_transaction.into_changes()) + .unwrap(); + + let v1_changes_before_migration: HashMap<_, _> = historical_rocks_db + .db + .iter_all::>(None) + .map(Result::unwrap) + .collect(); + + let v2_changes_count = historical_rocks_db + .db + .iter_all::>(None) + .count(); + + assert_eq!(v1_changes_before_migration.len(), 1000); + assert_eq!(v2_changes_count, 0); + + // When + // Wrap the inner historical rocksdb in a new instance to start the migration. + let historical_rocks_db_with_migration = + HistoricalRocksDB::try_from(historical_rocks_db).unwrap(); + + // Keep writing to the database until the migration is complete. + while historical_rocks_db_with_migration + .inner + .is_migration_in_progress() + { + if let Err(_) = historical_rocks_db_with_migration + .inner + .rollback_last_block() + { + // If rolling back fails, then cumulative changes are not being committed to rocksDB. + // We flush them the DB to keep the migration going. + // In a real scenario, the migration will progress because we always advance the height + // by committing new blocks. + historical_rocks_db_with_migration + .inner + .commit_migration_changes() + .unwrap(); + } + } + // Then + assert!(!historical_rocks_db_with_migration + .inner + .is_migration_in_progress()); + + let v2_changes: HashMap<_, _> = historical_rocks_db_with_migration + .inner + .db + .iter_all::>(None) + .map(Result::unwrap) + .collect(); + + // Check that all the keys that have not been rolled back are consistent with V1 + for (column, changes) in v2_changes { + assert_eq!(changes, *v1_changes_before_migration.get(&column).unwrap()); + } + } + #[test] fn state_rewind_policy__rewind_range_1__second_rollback_fails() { // Given