Skip to content

Commit

Permalink
feat: In-memory storage implementation (#18)
Browse files Browse the repository at this point in the history
# What ❔

Adds in-memory block and replica state storage implementation. Uses this
implementation for tests instead of RocksDB implementation. The RocksDB
implementation remains in the `storage` crate, but it's gated behind an
off-by-default feature.

## Why ❔

RocksDB involves more dependencies (incl. RocksDB compilation), which
could be an overkill, especially for tests.

---------

Co-authored-by: pompon0 <[email protected]>
  • Loading branch information
slowli and pompon0 authored Oct 31, 2023
1 parent cb34e4c commit 3705738
Show file tree
Hide file tree
Showing 19 changed files with 355 additions and 186 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ jobs:

- uses: mozilla-actions/[email protected]
- name: install nextest
uses: baptiste0928/cargo-install@v1
uses: baptiste0928/cargo-install@v2
with:
crate: cargo-nextest
- name: install cranky
uses: baptiste0928/cargo-install@v1
uses: baptiste0928/cargo-install@v2
with:
crate: cargo-cranky
- name: install deny
uses: baptiste0928/cargo-install@v1
uses: baptiste0928/cargo-install@v2
with:
crate: cargo-deny
- name: deny
run: cargo deny check
- name: fmt
run: cargo fmt --all --check
- name: cranky
run: cargo cranky --all-targets --all-features
- name: cranky (all features)
run: cargo cranky --workspace --all-targets --all-features
- name: cranky (default features)
run: cargo cranky --workspace --exclude tools --all-targets
- name: build
run: cargo build --all-targets --locked
- name: test
run: cargo nextest run --profile ci

3 changes: 0 additions & 3 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/actors/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ license.workspace = true
anyhow.workspace = true
once_cell.workspace = true
rand.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tracing.workspace = true
vise.workspace = true
Expand Down
10 changes: 2 additions & 8 deletions node/actors/consensus/src/testonly/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crate::{
use concurrency::ctx;
use roles::validator;
use std::sync::Arc;
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use tempfile::tempdir;
use storage::{FallbackReplicaStateStore, InMemoryStorage};
use utils::pipe::{self, DispatcherPipe};

/// This creates a mock Consensus struct for unit tests.
Expand All @@ -18,13 +17,8 @@ pub async fn make_consensus(
validator_set: &validator::ValidatorSet,
genesis_block: &validator::FinalBlock,
) -> (Consensus, DispatcherPipe<InputMessage, OutputMessage>) {
// Create a temporary folder.
let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("block_store");
// Initialize the storage.
let storage = RocksdbStorage::new(ctx, genesis_block, &temp_file)
.await
.unwrap();
let storage = InMemoryStorage::new(genesis_block.clone());
// Create the pipe.
let (consensus_pipe, dispatcher_pipe) = pipe::new();

Expand Down
8 changes: 2 additions & 6 deletions node/actors/consensus/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use storage::{FallbackReplicaStateStore, InMemoryStorage};
use tracing::Instrument as _;
use utils::pipe;

Expand Down Expand Up @@ -101,11 +101,7 @@ async fn run_nodes(
network_pipes.insert(validator_key.public(), network_actor_pipe);
s.spawn(
async {
let dir = tempfile::tempdir().context("tempdir()")?;
let storage =
RocksdbStorage::new(ctx, &genesis_block, &dir.path().join("storage"))
.await
.context("RocksdbStorage")?;
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = FallbackReplicaStateStore::from_store(Arc::new(storage));

let consensus = Consensus::new(
Expand Down
1 change: 0 additions & 1 deletion node/actors/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@ sync_blocks = { path = "../sync_blocks" }

[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
tokio.workspace = true
26 changes: 11 additions & 15 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use network::testonly::Instance;
use rand::Rng;
use roles::validator::{BlockNumber, Payload};
use std::collections::HashMap;
use storage::{BlockStore, RocksdbStorage, StorageError};
use storage::{BlockStore, InMemoryStorage, StorageError};

async fn run_executor(ctx: &ctx::Ctx, executor: Executor<RocksdbStorage>) -> anyhow::Result<()> {
async fn run_executor(ctx: &ctx::Ctx, executor: Executor<InMemoryStorage>) -> anyhow::Result<()> {
executor.run(ctx).await.or_else(|err| {
if err.root_cause().is::<ctx::Canceled>() {
Ok(()) // Test has successfully finished
Expand All @@ -22,7 +22,7 @@ async fn run_executor(ctx: &ctx::Ctx, executor: Executor<RocksdbStorage>) -> any
async fn store_final_blocks(
ctx: &ctx::Ctx,
mut blocks_receiver: channel::UnboundedReceiver<FinalBlock>,
storage: Arc<RocksdbStorage>,
storage: Arc<InMemoryStorage>,
) -> anyhow::Result<()> {
while let Ok(block) = blocks_receiver.recv(ctx).await {
tracing::trace!(number = %block.header.number, "Finalized new block");
Expand Down Expand Up @@ -73,9 +73,9 @@ impl FullValidatorConfig {

fn into_executor(
self,
storage: Arc<RocksdbStorage>,
storage: Arc<InMemoryStorage>,
) -> (
Executor<RocksdbStorage>,
Executor<InMemoryStorage>,
channel::UnboundedReceiver<FinalBlock>,
) {
let (blocks_sender, blocks_receiver) = channel::unbounded();
Expand All @@ -100,9 +100,8 @@ async fn executing_single_validator() {

let validator = FullValidatorConfig::for_single_validator(rng);
let genesis_block = &validator.node_config.genesis_block;
let temp_dir = tempfile::tempdir().unwrap();
let storage = RocksdbStorage::new(ctx, genesis_block, temp_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = Arc::new(storage);
let (executor, mut blocks_receiver) = validator.into_executor(storage);

scope::run!(ctx, |ctx, s| async {
Expand Down Expand Up @@ -151,13 +150,10 @@ async fn executing_validator_and_external_node() {
.insert(external_node_key.public());

let genesis_block = &validator.node_config.genesis_block;
let temp_dir = tempfile::tempdir().unwrap();
let validator_storage =
RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("validator")).await;
let validator_storage = Arc::new(validator_storage.unwrap());
let external_node_storage =
RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("en")).await;
let external_node_storage = Arc::new(external_node_storage.unwrap());
let validator_storage = InMemoryStorage::new(genesis_block.clone());
let validator_storage = Arc::new(validator_storage);
let external_node_storage = InMemoryStorage::new(genesis_block.clone());
let external_node_storage = Arc::new(external_node_storage);
let mut en_subscriber = external_node_storage.subscribe_to_block_writes();

let (validator, blocks_receiver) = validator.into_executor(validator_storage.clone());
Expand Down
1 change: 0 additions & 1 deletion node/actors/sync_blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ network = { path = "../network" }
assert_matches.workspace = true
async-trait.workspace = true
rand.workspace = true
tempfile.workspace = true
test-casing.workspace = true
tokio.workspace = true
18 changes: 7 additions & 11 deletions node/actors/sync_blocks/src/peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use concurrency::time;
use rand::{rngs::StdRng, seq::IteratorRandom, Rng};
use roles::validator;
use std::{collections::HashSet, fmt};
use storage::RocksdbStorage;
use storage::InMemoryStorage;
use test_casing::{test_casing, Product};

const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5);
Expand Down Expand Up @@ -64,15 +64,13 @@ async fn wait_for_stored_block(
async fn test_peer_states<T: Test>(test: T) {
concurrency::testonly::abort_on_panic();

let storage_dir = tempfile::tempdir().unwrap();

let ctx = &ctx::test_root(&ctx::RealClock).with_timeout(TEST_TIMEOUT);
let clock = ctx::ManualClock::new();
let ctx = &ctx::test_with_clock(ctx, &clock);
let mut rng = ctx.rng();
let test_validators = TestValidators::new(4, T::BLOCK_COUNT, &mut rng);
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);
test.initialize_storage(ctx, storage.as_ref(), &test_validators)
.await;

Expand Down Expand Up @@ -814,12 +812,11 @@ async fn requesting_blocks_with_unreliable_peers(

#[tokio::test]
async fn processing_invalid_sync_states() {
let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let test_validators = TestValidators::new(4, 3, rng);
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);

let (message_sender, _) = channel::unbounded();
let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config());
Expand Down Expand Up @@ -849,12 +846,11 @@ async fn processing_invalid_sync_states() {

#[tokio::test]
async fn processing_invalid_blocks() {
let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let test_validators = TestValidators::new(4, 3, rng);
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);

let (message_sender, _) = channel::unbounded();
let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config());
Expand Down
27 changes: 8 additions & 19 deletions node/actors/sync_blocks/src/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use concurrency::ctx::channel;
use network::testonly::Instance as NetworkInstance;
use rand::seq::SliceRandom;
use roles::node;
use std::{fmt, path::Path};
use storage::RocksdbStorage;
use std::fmt;
use storage::InMemoryStorage;
use test_casing::test_casing;
use tracing::Instrument;

Expand Down Expand Up @@ -99,18 +99,13 @@ impl Node {
self.network.gossip_config().key.public()
}

#[instrument(level = "trace", skip(ctx, test_validators, storage_dir), err)]
async fn run(
mut self,
ctx: &ctx::Ctx,
test_validators: &TestValidators,
storage_dir: &Path,
) -> anyhow::Result<()> {
#[instrument(level = "trace", skip(ctx, test_validators), err)]
async fn run(mut self, ctx: &ctx::Ctx, test_validators: &TestValidators) -> anyhow::Result<()> {
let key = self.key();
let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new();
let (network_actor_pipe, network_dispatcher_pipe) = pipe::new();
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir);
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);

let sync_blocks_config = test_validators.test_config();
let sync_blocks = SyncBlocks::new(
Expand Down Expand Up @@ -235,23 +230,17 @@ async fn test_sync_blocks<T: GossipNetworkTest>(test: T) {

concurrency::testonly::abort_on_panic();

let temp_dir = tempfile::TempDir::new().unwrap();
let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64))
.with_timeout(TEST_TIMEOUT * CLOCK_SPEEDUP);
let (node_count, gossip_peers) = test.network_params();
let (network, nodes) = GossipNetwork::new(&mut ctx.rng(), node_count, gossip_peers);
scope::run!(ctx, |ctx, s| async {
for (i, node) in nodes.into_iter().enumerate() {
for node in nodes {
let test_validators = network.test_validators.clone();
let storage_dir = temp_dir.path().join(i.to_string());
s.spawn_bg(async {
let test_validators = test_validators;
let storage_dir = storage_dir;
let key = node.key();
let err = node
.run(ctx, &test_validators, &storage_dir)
.await
.unwrap_err();
let err = node.run(ctx, &test_validators).await.unwrap_err();

tracing::trace!(?key, "Node task completed");
if err.root_cause().is::<ctx::Canceled>() {
Expand Down
12 changes: 4 additions & 8 deletions node/actors/sync_blocks/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use roles::validator::{
BlockHeader, BlockNumber, CommitQC, FinalBlock, Payload, ValidatorSet,
};
use std::iter;
use storage::RocksdbStorage;
use storage::InMemoryStorage;
use utils::pipe;

mod end_to_end;
Expand Down Expand Up @@ -106,17 +106,14 @@ impl TestValidators {
async fn subscribing_to_state_updates() {
concurrency::testonly::abort_on_panic();

let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let genesis_block = make_genesis_block(rng);
let block_1 = make_block(rng, &genesis_block.header);
let block_2 = make_block(rng, &block_1.header);
let block_3 = make_block(rng, &block_2.header);

let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path())
.await
.unwrap();
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = &Arc::new(storage);
let (actor_pipe, _dispatcher_pipe) = pipe::new();
let actor = SyncBlocks::new(ctx, actor_pipe, storage.clone(), rng.gen())
Expand Down Expand Up @@ -184,13 +181,12 @@ async fn subscribing_to_state_updates() {
async fn getting_blocks() {
concurrency::testonly::abort_on_panic();

let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let genesis_block = make_genesis_block(rng);

let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = Arc::new(storage);
let blocks = iter::successors(Some(genesis_block), |parent| {
Some(make_block(rng, &parent.header))
});
Expand Down
7 changes: 6 additions & 1 deletion node/libs/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license.workspace = true
anyhow.workspace = true
async-trait.workspace = true
rand.workspace = true
rocksdb.workspace = true
rocksdb = { workspace = true, optional = true }
thiserror.workspace = true
tracing.workspace = true

Expand All @@ -23,3 +23,8 @@ assert_matches.workspace = true
tempfile.workspace = true
test-casing.workspace = true
tokio.workspace = true

[features]
default = []
# Enables RocksDB-based storage.
rocksdb = ["dep:rocksdb"]
Loading

0 comments on commit 3705738

Please sign in to comment.