diff --git a/CHANGELOG.md b/CHANGELOG.md index af894952c11..3870ca84e02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. - [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks - [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently. +- [2233](https://github.com/FuelLabs/fuel-core/pull/2233): Introduce a new column `modification_history_v2` for storing the modification history in the historical rocksDB. Keys in this column are stored in big endian order. Changed the behaviour of the historical rocksDB to write changes for new block heights to the new column, and to perform lookup of values from the `modification_history_v2` table first, and then from the `modification_history` table, performing a migration upon access if necessary. #### Breaking - [2389](https://github.com/FuelLabs/fuel-core/pull/2258): Updated the `messageProof` GraphQL schema to return a non-nullable `MessageProof`. @@ -90,7 +91,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/). #### Breaking - [2311](https://github.com/FuelLabs/fuel-core/pull/2311): Changed the text of the error returned by the executor if gas overflows. - ## [Version 0.38.0] ### Added diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index 52887fdaf2e..48335ed9a39 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -14,7 +14,6 @@ use crate::{ Column, Historical, }, - modifications_history::ModificationsHistory, view_at_height::ViewAtHeight, }, iterable_key_value_view::IterableKeyValueViewWrapper, @@ -49,8 +48,13 @@ use fuel_core_storage::{ Error as StorageError, Result as StorageResult, StorageAsMut, + StorageMut, }; use itertools::Itertools; +use modifications_history::{ + ModificationsHistoryV1, + ModificationsHistoryV2, +}; use serde::{ Deserialize, Serialize, @@ -77,9 +81,12 @@ pub enum StateRewindPolicy { RewindRange { size: NonZeroU64 }, } +/// Implementation of a database #[derive(Debug)] pub struct HistoricalRocksDB { + /// The [`StateRewindPolicy`] used by the historical rocksdb state_rewind_policy: StateRewindPolicy, + /// The Description of the database. db: RocksDb>, } @@ -167,7 +174,6 @@ where // we need to apply all modifications up to `X + 1`. let rollback_height = height.as_u64().saturating_add(1); - // TODO: May fail incorrectly because of https://github.com/FuelLabs/fuel-core/issues/2095 let Some(oldest_height) = self.oldest_changes_height()? else { return Err(DatabaseError::NoHistoryIsAvailable.into()); }; @@ -183,11 +189,16 @@ where Ok(ViewAtHeight::new(rollback_height, latest_view)) } - fn store_modifications_history( + fn store_modifications_history( &self, - storage_transaction: &mut StorageTransaction<&RocksDb>>, + storage_transaction: &mut StorageTransaction, height: &Description::Height, - ) -> StorageResult<()> { + ) -> StorageResult<()> + where + T: KeyValueInspect>, + { + let modifications_history_migration_in_progress = self.is_migration_in_progress(); + if self.state_rewind_policy == StateRewindPolicy::NoRewind { return Ok(()); } @@ -196,11 +207,22 @@ where let reverse_changes = self.reverse_history_changes(storage_transaction.changes())?; - cleanup_old_changes(&height_u64, storage_transaction, &self.state_rewind_policy)?; + cleanup_old_changes( + &height_u64, + storage_transaction, + &self.state_rewind_policy, + modifications_history_migration_in_progress, + )?; - let old_changes = storage_transaction - .storage_as_mut::>() - .replace(&height_u64, &reverse_changes)?; + // We write directly to `ModificationsHistoryV2`. + // If the migration is in progress, we fallback to taking from + // `ModificationsHistoryV1` when no old_changes for `ModificationsHistoryV2` are found. + let old_changes = multiversion_replace( + storage_transaction, + height_u64, + &reverse_changes, + modifications_history_migration_in_progress, + )?; if let Some(old_changes) = old_changes { tracing::warn!( @@ -248,28 +270,60 @@ where Ok(()) } - fn oldest_changes_height(&self) -> StorageResult> { - let oldest_height = self + fn multiversion_changes_heights( + &self, + direction: IterDirection, + check_v1: bool, + ) -> (Option>, Option>) { + let v2_changes = self .db - .iter_all_keys::>(Some( - IterDirection::Forward, - )) - .next() - .transpose()?; + .iter_all_keys::>(Some(direction)) + .next(); + let v1_changes = check_v1 + .then(|| { + self.db + .iter_all_keys::>(Some(direction)) + .next() + }) + .flatten(); + + (v2_changes, v1_changes) + } + + fn oldest_changes_height(&self) -> StorageResult> { + let modifications_history_migration_in_progress = self.is_migration_in_progress(); + + let (v2_oldest_height, v1_oldest_height) = self.multiversion_changes_heights( + IterDirection::Forward, + modifications_history_migration_in_progress, + ); + + let v2_oldest_height = v2_oldest_height.transpose()?; + let v1_oldest_height = v1_oldest_height.transpose()?; + + let oldest_height = match (v1_oldest_height, v2_oldest_height) { + (None, v2) => v2, + (v1, None) => v1, + (Some(v1), Some(v2)) => Some(v1.min(v2)), + }; Ok(oldest_height) } #[cfg(test)] - // TODO: This method doesn't work properly because of - // https://github.com/FuelLabs/fuel-core/issues/2095 fn rollback_last_block(&self) -> StorageResult { - let latest_height = self - .db - .iter_all_keys::>(Some( - IterDirection::Reverse, - )) - .next() - .ok_or(DatabaseError::ReachedEndOfHistory)??; + let modifications_history_migration_in_progress = self.is_migration_in_progress(); + + let (v2_latest_height, v1_latest_height) = self.multiversion_changes_heights( + IterDirection::Reverse, + modifications_history_migration_in_progress, + ); + + let latest_height = match (v2_latest_height, v1_latest_height) { + (None, None) => Err(DatabaseError::ReachedEndOfHistory)?, + (Some(Ok(v1)), Some(Ok(v2))) => v1.max(v2), + (_, Some(v1_res)) => v1_res?, + (Some(v2_res), _) => v2_res?, + }; self.rollback_block_to(latest_height)?; @@ -279,10 +333,12 @@ where fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> { let mut storage_transaction = self.db.read_transaction(); - let last_changes = storage_transaction - .storage_as_mut::>() - .take(&height_to_rollback)? - .ok_or(not_found!(ModificationsHistory))?; + let last_changes = multiversion_take( + &mut storage_transaction, + height_to_rollback, + self.is_migration_in_progress(), + )? + .ok_or(not_found!(ModificationsHistoryV1))?; remove_historical_modifications( &height_to_rollback, @@ -302,15 +358,92 @@ where Ok(()) } + + fn v1_entries(&self) -> BoxedIter> { + self.db + .iter_all::>(None) + } + + fn is_migration_in_progress(&self) -> bool { + self.v1_entries().next().is_some() + } +} + +// Try to take the value from `ModificationsHistoryV2` first. +// If the migration is still in progress, remove the value from +// `ModificationsHistoryV1` and return it if no value for `ModificationsHistoryV2` +// was found. This is necessary to avoid scenarios where it is possible to +// roll back twice to the same block height +fn multiversion_op( + storage_transaction: &mut StorageTransaction, + height: u64, + modifications_history_migration_in_progress: bool, + f: F, +) -> StorageResult> +where + Description: DatabaseDescription, + T: KeyValueInspect>, + F: FnOnce( + StorageMut<'_, StorageTransaction, ModificationsHistoryV2>, + ) -> StorageResult>, +{ + let v2_last_changes = + f(storage_transaction.storage_as_mut::>())?; + + if modifications_history_migration_in_progress { + let v1_last_changes = storage_transaction + .storage_as_mut::>() + .take(&height)?; + Ok(v2_last_changes.or(v1_last_changes)) + } else { + Ok(v2_last_changes) + } +} + +fn multiversion_take( + storage_transaction: &mut StorageTransaction, + height: u64, + modifications_history_migration_in_progress: bool, +) -> StorageResult> +where + Description: DatabaseDescription, + T: KeyValueInspect>, +{ + multiversion_op( + storage_transaction, + height, + modifications_history_migration_in_progress, + |storage| storage.take(&height), + ) } -fn cleanup_old_changes( +fn multiversion_replace( + storage_transaction: &mut StorageTransaction, + height: u64, + changes: &Changes, + modifications_history_migration_in_progress: bool, +) -> StorageResult> +where + Description: DatabaseDescription, + T: KeyValueInspect>, +{ + multiversion_op( + storage_transaction, + height, + modifications_history_migration_in_progress, + |storage| storage.replace(&height, changes), + ) +} + +fn cleanup_old_changes( height: &u64, - storage_transaction: &mut StorageTransaction<&RocksDb>>, + storage_transaction: &mut StorageTransaction, state_rewind_policy: &StateRewindPolicy, + modifications_history_migration_in_progress: bool, ) -> StorageResult<()> where Description: DatabaseDescription, + T: KeyValueInspect>, { match state_rewind_policy { StateRewindPolicy::NoRewind => { @@ -322,9 +455,11 @@ where StateRewindPolicy::RewindRange { size } => { let old_height = height.saturating_sub(size.get()); - let old_changes = storage_transaction - .storage_as_mut::>() - .take(&old_height)?; + let old_changes = multiversion_take( + storage_transaction, + old_height, + modifications_history_migration_in_progress, + )?; if let Some(old_changes) = old_changes { remove_historical_modifications( @@ -338,13 +473,14 @@ where Ok(()) } -fn remove_historical_modifications( +fn remove_historical_modifications( old_height: &u64, - storage_transaction: &mut StorageTransaction<&RocksDb>>, + storage_transaction: &mut StorageTransaction, reverse_changes: &Changes, ) -> StorageResult<()> where Description: DatabaseDescription, + T: KeyValueInspect>, { let changes = reverse_changes .iter() @@ -443,12 +579,14 @@ where ) -> StorageResult<()> { let mut storage_transaction = StorageTransaction::transaction(&self.db, ConflictPolicy::Overwrite, changes); + if let Some(height) = height { self.store_modifications_history(&mut storage_transaction, &height)?; } self.db .commit_changes(&storage_transaction.into_changes())?; + Ok(()) } @@ -726,7 +864,7 @@ mod tests { .unwrap(); let entries = historical_rocks_db .db - .iter_all::>(None) + .iter_all::>(None) .collect::>(); assert_eq!(entries.len(), 1); @@ -737,11 +875,153 @@ mod tests { assert_eq!(result, Ok(1)); let entries = historical_rocks_db .db - .iter_all::>(None) + .iter_all::>(None) .collect::>(); assert_eq!(entries.len(), 0); } + #[test] + fn state_rewind_policy__rewind_range_1__rollback_uses_v2() { + // Given + let rocks_db = RocksDb::>::default_open_temp(None).unwrap(); + let historical_rocks_db = HistoricalRocksDB::new( + rocks_db, + StateRewindPolicy::RewindRange { + size: NonZeroU64::new(1).unwrap(), + }, + ) + .unwrap(); + + // When + let mut transaction = historical_rocks_db.read_transaction(); + transaction + .storage_as_mut::() + .insert(&key(), &123) + .unwrap(); + historical_rocks_db + .commit_changes(Some(1u32.into()), transaction.into_changes()) + .unwrap(); + let v2_entries = historical_rocks_db + .db + .iter_all::>(None) + .collect::>(); + let v1_entries = historical_rocks_db + .db + .iter_all::>(None) + .collect::>(); + + // Then + assert_eq!(v2_entries.len(), 1); + assert_eq!(v1_entries.len(), 0); + } + + #[test] + fn state_rewind_policy__rewind_range_1__rollback_during_migration_works() { + // Given + let rocks_db = RocksDb::>::default_open_temp(None).unwrap(); + let historical_rocks_db = HistoricalRocksDB::new( + rocks_db, + StateRewindPolicy::RewindRange { + size: NonZeroU64::new(1).unwrap(), + }, + ) + .unwrap(); + + // When + let mut transaction = historical_rocks_db.read_transaction(); + transaction + .storage_as_mut::() + .insert(&key(), &123) + .unwrap(); + historical_rocks_db + .commit_changes(Some(1u32.into()), transaction.into_changes()) + .unwrap(); + + // Migrate the changes from V2 to V1. + + let mut migration_transaction = StorageTransaction::transaction( + &historical_rocks_db.db, + ConflictPolicy::Overwrite, + Changes::default(), + ); + + let v2_changes = migration_transaction + .storage_as_mut::>() + .take(&1u64) + .unwrap() + .unwrap(); + migration_transaction + .storage_as_mut::>() + .insert(&1u64, &v2_changes) + .unwrap(); + + historical_rocks_db + .db + .commit_changes(&migration_transaction.into_changes()) + .unwrap(); + + // Check that the history has indeed been written to V1 + let v2_entries = historical_rocks_db + .db + .iter_all::>(None) + .collect::>(); + let v1_entries = historical_rocks_db + .db + .iter_all::>(None) + .collect::>(); + + assert_eq!(v2_entries.len(), 0); + assert_eq!(v1_entries.len(), 1); + + let result = historical_rocks_db.rollback_last_block(); + + // Then + assert_eq!(result, Ok(1)); + let v2_entries = historical_rocks_db + .db + .iter_all::>(None) + .collect::>(); + let v1_entries = historical_rocks_db + .db + .iter_all::>(None) + .collect::>(); + assert_eq!(v2_entries.len(), 0); + assert_eq!(v1_entries.len(), 0); + } + + #[test] + fn rollback_last_block_works_with_v2() { + // Given + let rocks_db = RocksDb::>::default_open_temp(None).unwrap(); + + let historical_rocks_db = + HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap(); + + // When + // Commit 1000 blocks + for i in 1..=1000u32 { + let mut transaction = historical_rocks_db.read_transaction(); + transaction + .storage_as_mut::() + .insert(&key(), &(123 + i as u64)) + .unwrap(); + historical_rocks_db + .commit_changes(Some(i.into()), transaction.into_changes()) + .unwrap(); + } + // We can now rollback the last block 1000 times. + let results: Vec> = (0..1000u32) + .map(|_| historical_rocks_db.rollback_last_block()) + .collect(); + + // Then + // If the rollback fails at some point, then we have unintentionally rollbacked to + // a block that was not the last. + for (i, result) in results.iter().enumerate() { + assert_eq!(result, &Ok(1000 - i as u64)); + } + } + #[test] fn state_rewind_policy__rewind_range_1__second_rollback_fails() { // Given @@ -806,7 +1086,7 @@ mod tests { // Given let entries = historical_rocks_db .db - .iter_all::>(None) + .iter_all::>(None) .collect::>(); assert_eq!(entries.len(), height); @@ -817,7 +1097,7 @@ mod tests { assert_eq!(result, Ok(height as u64)); let entries = historical_rocks_db .db - .iter_all::>(None) + .iter_all::>(None) .collect::>(); assert_eq!(entries.len(), height - 1); } diff --git a/crates/fuel-core/src/state/historical_rocksdb/description.rs b/crates/fuel-core/src/state/historical_rocksdb/description.rs index 03283967f06..ab46a400936 100644 --- a/crates/fuel-core/src/state/historical_rocksdb/description.rs +++ b/crates/fuel-core/src/state/historical_rocksdb/description.rs @@ -2,6 +2,9 @@ use crate::database::database_description::DatabaseDescription; use fuel_core_storage::kv_store::StorageColumn; pub const HISTORY_COLUMN_ID: u32 = u32::MAX / 2; +// Avoid conflicts with HistoricalDuplicateColumn indexes, which are +// in decreasing order starting from HISTORY_COLUMN_ID - 1. +pub const HISTORY_V2_COLUMN_ID: u32 = HISTORY_COLUMN_ID + 1; #[derive(Debug, Copy, Clone, enum_iterator::Sequence)] pub enum Column @@ -10,7 +13,10 @@ where { OriginalColumn(Description::Column), HistoricalDuplicateColumn(Description::Column), + /// Original history column HistoryColumn, + /// Migrated history column + HistoryV2Column, } impl strum::EnumCount for Column @@ -19,7 +25,7 @@ where { const COUNT: usize = Description::Column::COUNT /* original columns */ + Description::Column::COUNT /* duplicated columns */ - + 1 /* history column */; + + 2 /* history column, history V2 column */; } impl StorageColumn for Column @@ -33,6 +39,7 @@ where format!("history_{}", c.name()) } Column::HistoryColumn => "modifications_history".to_string(), + Column::HistoryV2Column => "modifications_history_v2".to_string(), } } @@ -43,6 +50,7 @@ where historical_duplicate_column_id(c.id()) } Column::HistoryColumn => HISTORY_COLUMN_ID, + Column::HistoryV2Column => HISTORY_V2_COLUMN_ID, } } } @@ -81,7 +89,7 @@ where Column::HistoricalDuplicateColumn(c) => { Some(Description::prefix(c).unwrap_or(0).saturating_add(8)) // `u64::to_be_bytes` } - Column::HistoryColumn => Some(8), + Column::HistoryColumn | Column::HistoryV2Column => Some(8), } } } @@ -102,8 +110,8 @@ mod tests { let variants = enum_iterator::all::>().collect::>(); let original = ::Column::COUNT; let duplicated = ::Column::COUNT; - let history_modification = 1; - let expected_count = original + duplicated + history_modification; + let history_modification_versions = 2; + let expected_count = original + duplicated + history_modification_versions; assert_eq!(variants.len(), expected_count); assert_eq!( as EnumCount>::COUNT, expected_count); } @@ -113,8 +121,8 @@ mod tests { let variants = enum_iterator::all::>().collect::>(); let original = ::Column::COUNT; let duplicated = ::Column::COUNT; - let history_modification = 1; - let expected_count = original + duplicated + history_modification; + let history_modification_versions = 2; + let expected_count = original + duplicated + history_modification_versions; assert_eq!(variants.len(), expected_count); assert_eq!( as EnumCount>::COUNT, expected_count); } @@ -124,8 +132,8 @@ mod tests { let variants = enum_iterator::all::>().collect::>(); let original = ::Column::COUNT; let duplicated = ::Column::COUNT; - let history_modification = 1; - let expected_count = original + duplicated + history_modification; + let history_modification_versions = 2; + let expected_count = original + duplicated + history_modification_versions; assert_eq!(variants.len(), expected_count); assert_eq!( as EnumCount>::COUNT, expected_count); } diff --git a/crates/fuel-core/src/state/historical_rocksdb/modifications_history.rs b/crates/fuel-core/src/state/historical_rocksdb/modifications_history.rs index f5cfecb4b30..fada2e13ed5 100644 --- a/crates/fuel-core/src/state/historical_rocksdb/modifications_history.rs +++ b/crates/fuel-core/src/state/historical_rocksdb/modifications_history.rs @@ -3,18 +3,30 @@ use crate::{ state::historical_rocksdb::description::Column, }; use fuel_core_storage::{ + self, blueprint::plain::Plain, - codec::postcard::Postcard, + codec::{ + postcard::Postcard, + primitive::Primitive, + }, structured_storage::TableWithBlueprint, transactional::Changes, Mappable, }; -pub struct ModificationsHistory(core::marker::PhantomData) +/// Versioned modification history. The `const VERSION: usize` generic parameter +/// is used to specify the version. +/// This allows to define different [`TableWithBlueprint`] +/// implementations for V1 and V2 of the modification history. +pub struct ModificationsHistoryVersion( + core::marker::PhantomData, +) where Description: DatabaseDescription; -impl Mappable for ModificationsHistory +/// [`ModificationsHistoryVersion`] keys and values for all versions. +impl Mappable + for ModificationsHistoryVersion where Description: DatabaseDescription, { @@ -26,12 +38,17 @@ where type OwnedValue = Self::Value; } -impl TableWithBlueprint for ModificationsHistory +pub type ModificationsHistoryV1 = + ModificationsHistoryVersion; +pub type ModificationsHistoryV2 = + ModificationsHistoryVersion; + +/// Blueprint for Modifications History V1. Keys are stored in little endian order +/// using the `Column::HistoryColumn` column family. +impl TableWithBlueprint for ModificationsHistoryV1 where Description: DatabaseDescription, { - // TODO: The Blueprint should be `Plain, Postcard>` to sort - // the keys in the database. https://github.com/FuelLabs/fuel-core/issues/2095 type Blueprint = Plain; type Column = Column; @@ -39,3 +56,17 @@ where Column::HistoryColumn } } + +/// Blueprint for Modifications History V2. Keys are stored in big endian order +/// using the `Column::HistoryColumnV2` column family. +impl TableWithBlueprint for ModificationsHistoryV2 +where + Description: DatabaseDescription, +{ + type Blueprint = Plain, Postcard>; + type Column = Column; + + fn column() -> Self::Column { + Column::HistoryV2Column + } +}