From 7eb333aad55411cc015ee00591e88e6bd8da08f0 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 30 Oct 2023 10:07:12 +0200 Subject: [PATCH 1/6] Sketch in-memory block storage --- node/libs/storage/Cargo.toml | 7 +- node/libs/storage/src/in_memory.rs | 137 +++++++++++++++++++++++++++++ node/libs/storage/src/lib.rs | 2 + 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 node/libs/storage/src/in_memory.rs diff --git a/node/libs/storage/Cargo.toml b/node/libs/storage/Cargo.toml index 52edcaa0..9629ee2b 100644 --- a/node/libs/storage/Cargo.toml +++ b/node/libs/storage/Cargo.toml @@ -10,7 +10,7 @@ license.workspace = true anyhow.workspace = true async-trait.workspace = true rand.workspace = true -rocksdb.workspace = true +rocksdb = { workspace = true, optional = true } thiserror.workspace = true tracing.workspace = true @@ -23,3 +23,8 @@ assert_matches.workspace = true tempfile.workspace = true test-casing.workspace = true tokio.workspace = true + +[features] +default = [] +# Enables RocksDB-based storage. +rocksdb = ["dep:rocksdb"] diff --git a/node/libs/storage/src/in_memory.rs b/node/libs/storage/src/in_memory.rs new file mode 100644 index 00000000..009be3c3 --- /dev/null +++ b/node/libs/storage/src/in_memory.rs @@ -0,0 +1,137 @@ +//! In-memory storage implementation. + +use crate::{ + traits::{BlockStore, WriteBlockStore}, + types::MissingBlockNumbers, + StorageResult, +}; +use async_trait::async_trait; +use concurrency::{ + ctx, + sync::{self, watch, Mutex}, +}; +use roles::validator::{BlockNumber, FinalBlock}; +use std::{collections::BTreeMap, ops}; + +#[derive(Debug)] +struct BlocksInMemoryStore { + blocks: BTreeMap, + last_contiguous_block_number: BlockNumber, +} + +impl BlocksInMemoryStore { + fn head_block(&self) -> &FinalBlock { + self.blocks.values().next_back().unwrap() + // ^ `unwrap()` is safe by construction; the storage contains at least the genesis block + } + + fn first_block(&self) -> &FinalBlock { + self.blocks.values().next().unwrap() + // ^ `unwrap()` is safe by construction; the storage contains at least the genesis block + } + + fn block(&self, number: BlockNumber) -> Option<&FinalBlock> { + self.blocks.get(&number) + } + + fn missing_block_numbers(&self, range: ops::Range) -> Vec { + let existing_numbers = self + .blocks + .range(range.clone()) + .map(|(&number, _)| Ok(number)); + MissingBlockNumbers::new(range, existing_numbers) + .map(Result::unwrap) + .collect() + } + + fn put_block(&mut self, block: FinalBlock) { + let block_number = block.header.number; + tracing::debug!("Inserting block #{block_number} into database"); + if let Some(prev_block) = self.blocks.insert(block_number, block) { + tracing::debug!(?prev_block, "Block #{block_number} is overwritten"); + } else { + for (&number, _) in self + .blocks + .range(self.last_contiguous_block_number.next()..) + { + let expected_block_number = self.last_contiguous_block_number.next(); + if number == expected_block_number { + self.last_contiguous_block_number = expected_block_number; + } else { + return; + } + } + } + } +} + +/// In-memory store. +#[derive(Debug)] +pub struct InMemoryStore { + blocks: Mutex, + blocks_sender: watch::Sender, +} + +impl InMemoryStore { + /// Creates a new store containing only the specified `genesis_block`. + pub fn new(genesis_block: FinalBlock) -> Self { + let genesis_block_number = genesis_block.header.number; + Self { + blocks: Mutex::new(BlocksInMemoryStore { + blocks: BTreeMap::from([(genesis_block_number, genesis_block)]), + last_contiguous_block_number: genesis_block_number, + }), + blocks_sender: watch::channel(genesis_block_number).0, + } + } +} + +#[async_trait] +impl BlockStore for InMemoryStore { + async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + Ok(sync::lock(ctx, &self.blocks).await?.head_block().clone()) + } + + async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + Ok(sync::lock(ctx, &self.blocks).await?.first_block().clone()) + } + + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + Ok(sync::lock(ctx, &self.blocks) + .await? + .last_contiguous_block_number) + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> StorageResult> { + Ok(sync::lock(ctx, &self.blocks).await?.block(number).cloned()) + } + + async fn missing_block_numbers( + &self, + ctx: &ctx::Ctx, + range: ops::Range, + ) -> StorageResult> { + Ok(sync::lock(ctx, &self.blocks) + .await? + .missing_block_numbers(range)) + } + + fn subscribe_to_block_writes(&self) -> watch::Receiver { + self.blocks_sender.subscribe() + } +} + +#[async_trait] +impl WriteBlockStore for InMemoryStore { + async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + sync::lock(ctx, &self.blocks) + .await? + .put_block(block.clone()); + self.blocks_sender.send_replace(block.header.number); + Ok(()) + } +} diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index 44d23342..a08b3e82 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -1,6 +1,7 @@ //! This module is responsible for persistent data storage, it provides schema-aware type-safe database access. Currently we use RocksDB, //! but this crate only exposes an abstraction of a database, so we can easily switch to a different storage engine in the future. +mod in_memory; mod replica_state; mod rocksdb; mod testonly; @@ -10,6 +11,7 @@ mod traits; mod types; pub use crate::{ + in_memory::InMemoryStore, replica_state::FallbackReplicaStateStore, rocksdb::RocksdbStorage, traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, From 14c47c5bf5424078c8ac46bd722ba955ba7d7e58 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 30 Oct 2023 10:36:31 +0200 Subject: [PATCH 2/6] Finish refactoring `storage` crate --- node/libs/storage/src/in_memory.rs | 30 ++++- node/libs/storage/src/lib.rs | 6 +- node/libs/storage/src/rocksdb.rs | 49 +++++++- .../storage/src/{tests.rs => tests/mod.rs} | 113 +++++++++--------- node/libs/storage/src/tests/rocksdb.rs | 43 +++++++ node/libs/storage/src/types.rs | 46 ------- 6 files changed, 175 insertions(+), 112 deletions(-) rename node/libs/storage/src/{tests.rs => tests/mod.rs} (52%) create mode 100644 node/libs/storage/src/tests/rocksdb.rs diff --git a/node/libs/storage/src/in_memory.rs b/node/libs/storage/src/in_memory.rs index 009be3c3..c4a45e7c 100644 --- a/node/libs/storage/src/in_memory.rs +++ b/node/libs/storage/src/in_memory.rs @@ -1,8 +1,8 @@ //! In-memory storage implementation. use crate::{ - traits::{BlockStore, WriteBlockStore}, - types::MissingBlockNumbers, + traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, + types::{MissingBlockNumbers, ReplicaState}, StorageResult, }; use async_trait::async_trait; @@ -67,12 +67,13 @@ impl BlocksInMemoryStore { /// In-memory store. #[derive(Debug)] -pub struct InMemoryStore { +pub struct InMemoryStorage { blocks: Mutex, + replica_state: Mutex>, blocks_sender: watch::Sender, } -impl InMemoryStore { +impl InMemoryStorage { /// Creates a new store containing only the specified `genesis_block`. pub fn new(genesis_block: FinalBlock) -> Self { let genesis_block_number = genesis_block.header.number; @@ -81,13 +82,14 @@ impl InMemoryStore { blocks: BTreeMap::from([(genesis_block_number, genesis_block)]), last_contiguous_block_number: genesis_block_number, }), + replica_state: Mutex::default(), blocks_sender: watch::channel(genesis_block_number).0, } } } #[async_trait] -impl BlockStore for InMemoryStore { +impl BlockStore for InMemoryStorage { async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { Ok(sync::lock(ctx, &self.blocks).await?.head_block().clone()) } @@ -126,7 +128,7 @@ impl BlockStore for InMemoryStore { } #[async_trait] -impl WriteBlockStore for InMemoryStore { +impl WriteBlockStore for InMemoryStorage { async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { sync::lock(ctx, &self.blocks) .await? @@ -135,3 +137,19 @@ impl WriteBlockStore for InMemoryStore { Ok(()) } } + +#[async_trait] +impl ReplicaStateStore for InMemoryStorage { + async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult> { + Ok(sync::lock(ctx, &self.replica_state).await?.clone()) + } + + async fn put_replica_state( + &self, + ctx: &ctx::Ctx, + replica_state: &ReplicaState, + ) -> StorageResult<()> { + *sync::lock(ctx, &self.replica_state).await? = Some(replica_state.clone()); + Ok(()) + } +} diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index a08b3e82..148b1cd2 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -3,6 +3,7 @@ mod in_memory; mod replica_state; +#[cfg(feature = "rocksdb")] mod rocksdb; mod testonly; #[cfg(test)] @@ -10,10 +11,11 @@ mod tests; mod traits; mod types; +#[cfg(feature = "rocksdb")] +pub use crate::rocksdb::RocksdbStorage; pub use crate::{ - in_memory::InMemoryStore, + in_memory::InMemoryStorage, replica_state::FallbackReplicaStateStore, - rocksdb::RocksdbStorage, traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, types::{Proposal, ReplicaState, StorageError, StorageResult}, }; diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index d2f1d43d..8b9666fe 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -5,13 +5,13 @@ use crate::{ traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, - types::{DatabaseKey, MissingBlockNumbers, ReplicaState}, + types::{MissingBlockNumbers, ReplicaState}, StorageError, StorageResult, }; use anyhow::Context as _; use async_trait::async_trait; use concurrency::{ctx, scope, sync::watch}; -use rocksdb::{IteratorMode, ReadOptions}; +use rocksdb::{Direction, IteratorMode, ReadOptions}; use roles::validator::{BlockNumber, FinalBlock}; use std::{ fmt, ops, @@ -22,6 +22,51 @@ use std::{ }, }; +/// Enum used to represent a key in the database. It also acts as a separator between different stores. +#[derive(Debug, Clone, PartialEq, Eq)] +enum DatabaseKey { + /// Key used to store the replica state. + /// ReplicaState -> ReplicaState + ReplicaState, + /// Key used to store the finalized blocks. + /// Block(BlockNumber) -> FinalBlock + Block(BlockNumber), +} + +impl DatabaseKey { + /// Starting database key for blocks indexed by number. All other keys in the default column family + /// are lower than this value. + pub(crate) const BLOCKS_START_KEY: &'static [u8] = &u64::MIN.to_be_bytes(); + + /// Iterator mode for the head block (i.e., a block with the greatest number). + pub(crate) const BLOCK_HEAD_ITERATOR: IteratorMode<'static> = + IteratorMode::From(&u64::MAX.to_be_bytes(), Direction::Reverse); + + /// Encodes this key for usage as a RocksDB key. + /// + /// # Implementation note + /// + /// This logic is maintainable only while the amount of non-block keys remains small. + /// If more keys are added (especially if their number is not known statically), prefer using + /// separate column families for them. + pub(crate) fn encode_key(&self) -> Vec { + match self { + // Keys for non-block entries must be smaller than all block keys. + Self::ReplicaState => vec![0], + // Number encoding that monotonically increases with the number + Self::Block(number) => number.0.to_be_bytes().to_vec(), + } + } + + /// Parses the specified bytes as a `Self::Block(_)` key. + pub(crate) fn parse_block_key(raw_key: &[u8]) -> anyhow::Result { + let raw_key = raw_key + .try_into() + .context("Invalid encoding for block key")?; + Ok(BlockNumber(u64::from_be_bytes(raw_key))) + } +} + /// Main struct for the Storage module, it just contains the database. Provides a set of high-level /// atomic operations on the database. It "contains" the following data: /// diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests/mod.rs similarity index 52% rename from node/libs/storage/src/tests.rs rename to node/libs/storage/src/tests/mod.rs index fd104cb0..1190ff2c 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests/mod.rs @@ -1,92 +1,90 @@ use super::*; use crate::types::ReplicaState; +use async_trait::async_trait; use concurrency::ctx; use rand::{seq::SliceRandom, Rng}; use roles::validator::{testonly::make_block, BlockHeader, BlockNumber, FinalBlock, Payload}; use std::iter; -use tempfile::TempDir; -async fn init_store(ctx: &ctx::Ctx, rng: &mut R) -> (FinalBlock, RocksdbStorage, TempDir) { +#[cfg(feature = "rocksdb")] +mod rocksdb; + +#[async_trait] +trait InitStore { + type Store: WriteBlockStore + ReplicaStateStore; + + async fn init_store(&self, ctx: &ctx::Ctx, genesis_block: &FinalBlock) -> Self::Store; +} + +#[async_trait] +impl InitStore for () { + type Store = InMemoryStorage; + + async fn init_store(&self, _ctx: &ctx::Ctx, genesis_block: &FinalBlock) -> Self::Store { + InMemoryStorage::new(genesis_block.clone()) + } +} + +fn genesis_block(rng: &mut impl Rng) -> FinalBlock { let payload = Payload(vec![]); - let genesis_block = FinalBlock { + FinalBlock { header: BlockHeader::genesis(payload.hash()), payload, justification: rng.gen(), - }; - let temp_dir = TempDir::new().unwrap(); - let block_store = RocksdbStorage::new(ctx, &genesis_block, temp_dir.path()) - .await - .unwrap(); - (genesis_block, block_store, temp_dir) + } } -#[tokio::test] -async fn init_store_twice() { - let ctx = ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - - let (genesis_block, block_store, temp_dir) = init_store(&ctx, rng).await; - let block_1 = make_block(rng, &genesis_block.header); - block_store.put_block(&ctx, &block_1).await.unwrap(); - - assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); - assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_1); - - drop(block_store); - let block_store = RocksdbStorage::new(&ctx, &genesis_block, temp_dir.path()) - .await - .unwrap(); - - assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); - assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_1); +fn gen_blocks(rng: &mut impl Rng, genesis_block: FinalBlock, count: usize) -> Vec { + let blocks = iter::successors(Some(genesis_block), |parent| { + Some(make_block(rng, &parent.header)) + }); + blocks.skip(1).take(count).collect() } -#[tokio::test] -async fn test_put_block() { - let ctx = ctx::test_root(&ctx::RealClock); +async fn test_put_block(store_factory: &impl InitStore) { + let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let (genesis_block, block_store, _temp_dir) = init_store(&ctx, rng).await; + let genesis_block = genesis_block(rng); + let block_store = store_factory.init_store(ctx, &genesis_block).await; - assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); - assert_eq!(block_store.head_block(&ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.first_block(ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(ctx).await.unwrap(), genesis_block); let mut block_subscriber = block_store.subscribe_to_block_writes(); assert_eq!(*block_subscriber.borrow_and_update(), BlockNumber(0)); // Test inserting a block with a valid parent. let block_1 = make_block(rng, &genesis_block.header); - block_store.put_block(&ctx, &block_1).await.unwrap(); + block_store.put_block(ctx, &block_1).await.unwrap(); - assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); - assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_1); + assert_eq!(block_store.first_block(ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(ctx).await.unwrap(), block_1); assert_eq!(*block_subscriber.borrow_and_update(), block_1.header.number); // Test inserting a block with a valid parent that is not the genesis. let block_2 = make_block(rng, &block_1.header); - block_store.put_block(&ctx, &block_2).await.unwrap(); + block_store.put_block(ctx, &block_2).await.unwrap(); - assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); - assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_2); + assert_eq!(block_store.first_block(ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(ctx).await.unwrap(), block_2); assert_eq!(*block_subscriber.borrow_and_update(), block_2.header.number); } -fn gen_blocks(rng: &mut impl Rng, genesis_block: FinalBlock, count: usize) -> Vec { - let blocks = iter::successors(Some(genesis_block), |parent| { - Some(make_block(rng, &parent.header)) - }); - blocks.skip(1).take(count).collect() +#[tokio::test] +async fn putting_block_for_in_memory_store() { + test_put_block(&()).await; } -#[tokio::test] -async fn test_get_missing_block_numbers() { - let ctx = ctx::test_root(&ctx::RealClock); +async fn test_get_missing_block_numbers(store_factory: &impl InitStore) { + let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let (genesis_block, block_store, _temp_dir) = init_store(&ctx, rng).await; + let genesis_block = genesis_block(rng); + let block_store = store_factory.init_store(ctx, &genesis_block).await; let mut blocks = gen_blocks(rng, genesis_block, 100); blocks.shuffle(rng); assert!(block_store - .missing_block_numbers(&ctx, BlockNumber(0)..BlockNumber(101)) + .missing_block_numbers(ctx, BlockNumber(0)..BlockNumber(101)) .await .unwrap() .into_iter() @@ -94,15 +92,13 @@ async fn test_get_missing_block_numbers() { .eq(1..101)); for (i, block) in blocks.iter().enumerate() { - block_store.put_block(&ctx, block).await.unwrap(); + block_store.put_block(ctx, block).await.unwrap(); let missing_block_numbers = block_store - .missing_block_numbers(&ctx, BlockNumber(0)..BlockNumber(101)) - .await - .unwrap(); - let last_contiguous_block_number = block_store - .last_contiguous_block_number(&ctx) + .missing_block_numbers(ctx, BlockNumber(0)..BlockNumber(101)) .await .unwrap(); + let last_contiguous_block_number = + block_store.last_contiguous_block_number(ctx).await.unwrap(); let mut expected_block_numbers: Vec<_> = blocks[(i + 1)..].iter().map(|b| b.header.number).collect(); @@ -120,6 +116,11 @@ async fn test_get_missing_block_numbers() { } } +#[tokio::test] +async fn getting_missing_block_numbers_for_in_memory_store() { + test_get_missing_block_numbers(&()).await; +} + #[test] fn test_schema_encode_decode() { let ctx = ctx::test_root(&ctx::RealClock); diff --git a/node/libs/storage/src/tests/rocksdb.rs b/node/libs/storage/src/tests/rocksdb.rs new file mode 100644 index 00000000..0188143b --- /dev/null +++ b/node/libs/storage/src/tests/rocksdb.rs @@ -0,0 +1,43 @@ +use super::*; +use tempfile::TempDir; + +#[async_trait] +impl InitStore for TempDir { + type Store = RocksdbStorage; + + async fn init_store(&self, ctx: &ctx::Ctx, genesis_block: &FinalBlock) -> Self::Store { + RocksdbStorage::new(ctx, genesis_block, self.path()) + .await + .expect("Failed initializing RocksDB") + } +} + +#[tokio::test] +async fn initializing_store_twice() { + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let genesis_block = genesis_block(rng); + let temp_dir = TempDir::new().unwrap(); + let block_store = temp_dir.init_store(ctx, &genesis_block).await; + let block_1 = make_block(rng, &genesis_block.header); + block_store.put_block(ctx, &block_1).await.unwrap(); + + assert_eq!(block_store.first_block(ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(ctx).await.unwrap(), block_1); + + drop(block_store); + let block_store = temp_dir.init_store(ctx, &genesis_block).await; + + assert_eq!(block_store.first_block(ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(ctx).await.unwrap(), block_1); +} + +#[tokio::test] +async fn putting_block_for_rocksdb_store() { + test_put_block(&TempDir::new().unwrap()).await; +} + +#[tokio::test] +async fn getting_missing_block_numbers_for_rocksdb_store() { + test_get_missing_block_numbers(&TempDir::new().unwrap()).await; +} diff --git a/node/libs/storage/src/types.rs b/node/libs/storage/src/types.rs index ac4337ac..1b437ac5 100644 --- a/node/libs/storage/src/types.rs +++ b/node/libs/storage/src/types.rs @@ -2,56 +2,10 @@ use anyhow::Context as _; use concurrency::ctx; -use rocksdb::{Direction, IteratorMode}; use roles::validator::{self, BlockNumber}; use schema::{proto::storage as proto, read_required, required, ProtoFmt}; use std::{iter, ops}; -/// Enum used to represent a key in the database. It also acts as a separator between different stores. -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) enum DatabaseKey { - /// Key used to store the replica state. - /// ReplicaState -> ReplicaState - ReplicaState, - /// Key used to store the finalized blocks. - /// Block(BlockNumber) -> FinalBlock - Block(BlockNumber), -} - -impl DatabaseKey { - /// Starting database key for blocks indexed by number. All other keys in the default column family - /// are lower than this value. - pub(crate) const BLOCKS_START_KEY: &'static [u8] = &u64::MIN.to_be_bytes(); - - /// Iterator mode for the head block (i.e., a block with the greatest number). - pub(crate) const BLOCK_HEAD_ITERATOR: IteratorMode<'static> = - IteratorMode::From(&u64::MAX.to_be_bytes(), Direction::Reverse); - - /// Encodes this key for usage as a RocksDB key. - /// - /// # Implementation note - /// - /// This logic is maintainable only while the amount of non-block keys remains small. - /// If more keys are added (especially if their number is not known statically), prefer using - /// separate column families for them. - pub(crate) fn encode_key(&self) -> Vec { - match self { - // Keys for non-block entries must be smaller than all block keys. - Self::ReplicaState => vec![0], - // Number encoding that monotonically increases with the number - Self::Block(number) => number.0.to_be_bytes().to_vec(), - } - } - - /// Parses the specified bytes as a `Self::Block(_)` key. - pub(crate) fn parse_block_key(raw_key: &[u8]) -> anyhow::Result { - let raw_key = raw_key - .try_into() - .context("Invalid encoding for block key")?; - Ok(BlockNumber(u64::from_be_bytes(raw_key))) - } -} - /// A payload of a proposed block which is not known to be finalized yet. /// Replicas have to persist such proposed payloads for liveness: /// consensus may finalize a block without knowing a payload in case of reproposals. From 28740c64ea68ded6512445fcd36cf61b3eb8c462 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 30 Oct 2023 10:54:00 +0200 Subject: [PATCH 3/6] Update RocksDB uses in workspace --- node/actors/consensus/src/testonly/make.rs | 10 ++----- node/actors/consensus/src/testonly/run.rs | 8 ++---- node/actors/executor/src/tests.rs | 26 ++++++++---------- node/actors/sync_blocks/src/peers/tests.rs | 18 +++++-------- .../sync_blocks/src/tests/end_to_end.rs | 27 ++++++------------- node/actors/sync_blocks/src/tests/mod.rs | 12 +++------ node/tools/Cargo.toml | 2 +- 7 files changed, 35 insertions(+), 68 deletions(-) diff --git a/node/actors/consensus/src/testonly/make.rs b/node/actors/consensus/src/testonly/make.rs index 7215ec7a..f13fb425 100644 --- a/node/actors/consensus/src/testonly/make.rs +++ b/node/actors/consensus/src/testonly/make.rs @@ -7,8 +7,7 @@ use crate::{ use concurrency::ctx; use roles::validator; use std::sync::Arc; -use storage::{FallbackReplicaStateStore, RocksdbStorage}; -use tempfile::tempdir; +use storage::{FallbackReplicaStateStore, InMemoryStorage}; use utils::pipe::{self, DispatcherPipe}; /// This creates a mock Consensus struct for unit tests. @@ -18,13 +17,8 @@ pub async fn make_consensus( validator_set: &validator::ValidatorSet, genesis_block: &validator::FinalBlock, ) -> (Consensus, DispatcherPipe) { - // Create a temporary folder. - let temp_dir = tempdir().unwrap(); - let temp_file = temp_dir.path().join("block_store"); // Initialize the storage. - let storage = RocksdbStorage::new(ctx, genesis_block, &temp_file) - .await - .unwrap(); + let storage = InMemoryStorage::new(genesis_block.clone()); // Create the pipe. let (consensus_pipe, dispatcher_pipe) = pipe::new(); diff --git a/node/actors/consensus/src/testonly/run.rs b/node/actors/consensus/src/testonly/run.rs index 6a1a1d93..a5d09c7a 100644 --- a/node/actors/consensus/src/testonly/run.rs +++ b/node/actors/consensus/src/testonly/run.rs @@ -7,7 +7,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use storage::{FallbackReplicaStateStore, RocksdbStorage}; +use storage::{FallbackReplicaStateStore, InMemoryStorage}; use tracing::Instrument as _; use utils::pipe; @@ -101,11 +101,7 @@ async fn run_nodes( network_pipes.insert(validator_key.public(), network_actor_pipe); s.spawn( async { - let dir = tempfile::tempdir().context("tempdir()")?; - let storage = - RocksdbStorage::new(ctx, &genesis_block, &dir.path().join("storage")) - .await - .context("RocksdbStorage")?; + let storage = InMemoryStorage::new(genesis_block.clone()); let storage = FallbackReplicaStateStore::from_store(Arc::new(storage)); let consensus = Consensus::new( diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index c85750e3..ac335e8c 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -7,9 +7,9 @@ use network::testonly::Instance; use rand::Rng; use roles::validator::{BlockNumber, Payload}; use std::collections::HashMap; -use storage::{BlockStore, RocksdbStorage, StorageError}; +use storage::{BlockStore, InMemoryStorage, StorageError}; -async fn run_executor(ctx: &ctx::Ctx, executor: Executor) -> anyhow::Result<()> { +async fn run_executor(ctx: &ctx::Ctx, executor: Executor) -> anyhow::Result<()> { executor.run(ctx).await.or_else(|err| { if err.root_cause().is::() { Ok(()) // Test has successfully finished @@ -22,7 +22,7 @@ async fn run_executor(ctx: &ctx::Ctx, executor: Executor) -> any async fn store_final_blocks( ctx: &ctx::Ctx, mut blocks_receiver: channel::UnboundedReceiver, - storage: Arc, + storage: Arc, ) -> anyhow::Result<()> { while let Ok(block) = blocks_receiver.recv(ctx).await { tracing::trace!(number = %block.header.number, "Finalized new block"); @@ -73,9 +73,9 @@ impl FullValidatorConfig { fn into_executor( self, - storage: Arc, + storage: Arc, ) -> ( - Executor, + Executor, channel::UnboundedReceiver, ) { let (blocks_sender, blocks_receiver) = channel::unbounded(); @@ -100,9 +100,8 @@ async fn executing_single_validator() { let validator = FullValidatorConfig::for_single_validator(rng); let genesis_block = &validator.node_config.genesis_block; - let temp_dir = tempfile::tempdir().unwrap(); - let storage = RocksdbStorage::new(ctx, genesis_block, temp_dir.path()); - let storage = Arc::new(storage.await.unwrap()); + let storage = InMemoryStorage::new(genesis_block.clone()); + let storage = Arc::new(storage); let (executor, mut blocks_receiver) = validator.into_executor(storage); scope::run!(ctx, |ctx, s| async { @@ -151,13 +150,10 @@ async fn executing_validator_and_external_node() { .insert(external_node_key.public()); let genesis_block = &validator.node_config.genesis_block; - let temp_dir = tempfile::tempdir().unwrap(); - let validator_storage = - RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("validator")).await; - let validator_storage = Arc::new(validator_storage.unwrap()); - let external_node_storage = - RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("en")).await; - let external_node_storage = Arc::new(external_node_storage.unwrap()); + let validator_storage = InMemoryStorage::new(genesis_block.clone()); + let validator_storage = Arc::new(validator_storage); + let external_node_storage = InMemoryStorage::new(genesis_block.clone()); + let external_node_storage = Arc::new(external_node_storage); let mut en_subscriber = external_node_storage.subscribe_to_block_writes(); let (validator, blocks_receiver) = validator.into_executor(validator_storage.clone()); diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index cadbf1f5..390b01e9 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -6,7 +6,7 @@ use concurrency::time; use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; use roles::validator; use std::{collections::HashSet, fmt}; -use storage::RocksdbStorage; +use storage::InMemoryStorage; use test_casing::{test_casing, Product}; const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5); @@ -64,15 +64,13 @@ async fn wait_for_stored_block( async fn test_peer_states(test: T) { concurrency::testonly::abort_on_panic(); - let storage_dir = tempfile::tempdir().unwrap(); - let ctx = &ctx::test_root(&ctx::RealClock).with_timeout(TEST_TIMEOUT); let clock = ctx::ManualClock::new(); let ctx = &ctx::test_with_clock(ctx, &clock); let mut rng = ctx.rng(); let test_validators = TestValidators::new(4, T::BLOCK_COUNT, &mut rng); - let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path()); - let storage = Arc::new(storage.await.unwrap()); + let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); + let storage = Arc::new(storage); test.initialize_storage(ctx, storage.as_ref(), &test_validators) .await; @@ -814,12 +812,11 @@ async fn requesting_blocks_with_unreliable_peers( #[tokio::test] async fn processing_invalid_sync_states() { - let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let test_validators = TestValidators::new(4, 3, rng); - let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path()); - let storage = Arc::new(storage.await.unwrap()); + let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); + let storage = Arc::new(storage); let (message_sender, _) = channel::unbounded(); let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); @@ -849,12 +846,11 @@ async fn processing_invalid_sync_states() { #[tokio::test] async fn processing_invalid_blocks() { - let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let test_validators = TestValidators::new(4, 3, rng); - let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path()); - let storage = Arc::new(storage.await.unwrap()); + let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); + let storage = Arc::new(storage); let (message_sender, _) = channel::unbounded(); let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 46c30a27..861419f9 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -7,8 +7,8 @@ use concurrency::ctx::channel; use network::testonly::Instance as NetworkInstance; use rand::seq::SliceRandom; use roles::node; -use std::{fmt, path::Path}; -use storage::RocksdbStorage; +use std::fmt; +use storage::InMemoryStorage; use test_casing::test_casing; use tracing::Instrument; @@ -99,18 +99,13 @@ impl Node { self.network.gossip_config().key.public() } - #[instrument(level = "trace", skip(ctx, test_validators, storage_dir), err)] - async fn run( - mut self, - ctx: &ctx::Ctx, - test_validators: &TestValidators, - storage_dir: &Path, - ) -> anyhow::Result<()> { + #[instrument(level = "trace", skip(ctx, test_validators), err)] + async fn run(mut self, ctx: &ctx::Ctx, test_validators: &TestValidators) -> anyhow::Result<()> { let key = self.key(); let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); - let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir); - let storage = Arc::new(storage.await.unwrap()); + let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); + let storage = Arc::new(storage); let sync_blocks_config = test_validators.test_config(); let sync_blocks = SyncBlocks::new( @@ -235,23 +230,17 @@ async fn test_sync_blocks(test: T) { concurrency::testonly::abort_on_panic(); - let temp_dir = tempfile::TempDir::new().unwrap(); let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64)) .with_timeout(TEST_TIMEOUT * CLOCK_SPEEDUP); let (node_count, gossip_peers) = test.network_params(); let (network, nodes) = GossipNetwork::new(&mut ctx.rng(), node_count, gossip_peers); scope::run!(ctx, |ctx, s| async { - for (i, node) in nodes.into_iter().enumerate() { + for node in nodes { let test_validators = network.test_validators.clone(); - let storage_dir = temp_dir.path().join(i.to_string()); s.spawn_bg(async { let test_validators = test_validators; - let storage_dir = storage_dir; let key = node.key(); - let err = node - .run(ctx, &test_validators, &storage_dir) - .await - .unwrap_err(); + let err = node.run(ctx, &test_validators).await.unwrap_err(); tracing::trace!(?key, "Node task completed"); if err.root_cause().is::() { diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index 4bb49a95..6ef9faa4 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -13,7 +13,7 @@ use roles::validator::{ BlockHeader, BlockNumber, CommitQC, FinalBlock, Payload, ValidatorSet, }; use std::iter; -use storage::RocksdbStorage; +use storage::InMemoryStorage; use utils::pipe; mod end_to_end; @@ -106,7 +106,6 @@ impl TestValidators { async fn subscribing_to_state_updates() { concurrency::testonly::abort_on_panic(); - let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let genesis_block = make_genesis_block(rng); @@ -114,9 +113,7 @@ async fn subscribing_to_state_updates() { let block_2 = make_block(rng, &block_1.header); let block_3 = make_block(rng, &block_2.header); - let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path()) - .await - .unwrap(); + let storage = InMemoryStorage::new(genesis_block.clone()); let storage = &Arc::new(storage); let (actor_pipe, _dispatcher_pipe) = pipe::new(); let actor = SyncBlocks::new(ctx, actor_pipe, storage.clone(), rng.gen()) @@ -182,13 +179,12 @@ async fn subscribing_to_state_updates() { async fn getting_blocks() { concurrency::testonly::abort_on_panic(); - let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let genesis_block = make_genesis_block(rng); - let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path()); - let storage = Arc::new(storage.await.unwrap()); + let storage = InMemoryStorage::new(genesis_block.clone()); + let storage = Arc::new(storage); let blocks = iter::successors(Some(genesis_block), |parent| { Some(make_block(rng, &parent.header)) }); diff --git a/node/tools/Cargo.toml b/node/tools/Cargo.toml index 655b1a6a..4129497f 100644 --- a/node/tools/Cargo.toml +++ b/node/tools/Cargo.toml @@ -20,7 +20,7 @@ vise-exporter.workspace = true concurrency = { path = "../libs/concurrency" } crypto = { path = "../libs/crypto" } roles = { path = "../libs/roles" } -storage = { path = "../libs/storage" } +storage = { path = "../libs/storage", features = ["rocksdb"] } schema = { path = "../libs/schema" } utils = { path = "../libs/utils" } From 978bc925bf7128747619cbc6c6e0fc260d27effa Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 30 Oct 2023 12:26:10 +0200 Subject: [PATCH 4/6] Remove unused `tempfile` dependency --- node/Cargo.lock | 3 --- node/actors/consensus/Cargo.toml | 1 - node/actors/executor/Cargo.toml | 1 - node/actors/sync_blocks/Cargo.toml | 1 - 4 files changed, 6 deletions(-) diff --git a/node/Cargo.lock b/node/Cargo.lock index 390ba435..048dcdd5 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -414,7 +414,6 @@ dependencies = [ "roles", "schema", "storage", - "tempfile", "thiserror", "tokio", "tracing", @@ -611,7 +610,6 @@ dependencies = [ "schema", "storage", "sync_blocks", - "tempfile", "tokio", "tracing", "utils", @@ -1926,7 +1924,6 @@ dependencies = [ "rand", "roles", "storage", - "tempfile", "test-casing", "thiserror", "tokio", diff --git a/node/actors/consensus/Cargo.toml b/node/actors/consensus/Cargo.toml index beab70a8..b47c0d4f 100644 --- a/node/actors/consensus/Cargo.toml +++ b/node/actors/consensus/Cargo.toml @@ -10,7 +10,6 @@ license.workspace = true anyhow.workspace = true once_cell.workspace = true rand.workspace = true -tempfile.workspace = true thiserror.workspace = true tracing.workspace = true vise.workspace = true diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index 5a8e9b4f..574cc456 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -24,5 +24,4 @@ sync_blocks = { path = "../sync_blocks" } [dev-dependencies] rand.workspace = true -tempfile.workspace = true tokio.workspace = true diff --git a/node/actors/sync_blocks/Cargo.toml b/node/actors/sync_blocks/Cargo.toml index 5657f754..646132d2 100644 --- a/node/actors/sync_blocks/Cargo.toml +++ b/node/actors/sync_blocks/Cargo.toml @@ -22,6 +22,5 @@ network = { path = "../network" } assert_matches.workspace = true async-trait.workspace = true rand.workspace = true -tempfile.workspace = true test-casing.workspace = true tokio.workspace = true From b70a8b1c4a40352d5d935b789801c617ebe6ec17 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 30 Oct 2023 12:30:17 +0200 Subject: [PATCH 5/6] Check crates with default features in CI --- .github/workflows/rust.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 30b552c1..6e33cd7a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -46,10 +46,11 @@ jobs: run: cargo deny check - name: fmt run: cargo fmt --all --check - - name: cranky - run: cargo cranky --all-targets --all-features + - name: cranky (all features) + run: cargo cranky --workspace --all-targets --all-features + - name: cranky (default features) + run: cargo cranky --workspace --exclude tools --all-targets - name: build run: cargo build --all-targets --locked - name: test run: cargo nextest run --profile ci - From 582d21e75f5cfcf8bcb6510c6b851e60249e626a Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 30 Oct 2023 12:32:08 +0200 Subject: [PATCH 6/6] Update `cargo-install` GitHub action --- .github/workflows/rust.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6e33cd7a..07bf2610 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -31,15 +31,15 @@ jobs: - uses: mozilla-actions/sccache-action@v0.0.3 - name: install nextest - uses: baptiste0928/cargo-install@v1 + uses: baptiste0928/cargo-install@v2 with: crate: cargo-nextest - name: install cranky - uses: baptiste0928/cargo-install@v1 + uses: baptiste0928/cargo-install@v2 with: crate: cargo-cranky - name: install deny - uses: baptiste0928/cargo-install@v1 + uses: baptiste0928/cargo-install@v2 with: crate: cargo-deny - name: deny