Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: In-memory storage implementation #18

Merged
merged 7 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ jobs:

- uses: mozilla-actions/[email protected]
- name: install nextest
uses: baptiste0928/cargo-install@v1
uses: baptiste0928/cargo-install@v2
with:
crate: cargo-nextest
- name: install cranky
uses: baptiste0928/cargo-install@v1
uses: baptiste0928/cargo-install@v2
with:
crate: cargo-cranky
- name: install deny
uses: baptiste0928/cargo-install@v1
uses: baptiste0928/cargo-install@v2
with:
crate: cargo-deny
- name: deny
run: cargo deny check
- name: fmt
run: cargo fmt --all --check
- name: cranky
run: cargo cranky --all-targets --all-features
- name: cranky (all features)
run: cargo cranky --workspace --all-targets --all-features
- name: cranky (default features)
run: cargo cranky --workspace --exclude tools --all-targets
- name: build
run: cargo build --all-targets --locked
- name: test
run: cargo nextest run --profile ci

3 changes: 0 additions & 3 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/actors/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ license.workspace = true
anyhow.workspace = true
once_cell.workspace = true
rand.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tracing.workspace = true
vise.workspace = true
Expand Down
10 changes: 2 additions & 8 deletions node/actors/consensus/src/testonly/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crate::{
use concurrency::ctx;
use roles::validator;
use std::sync::Arc;
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use tempfile::tempdir;
use storage::{FallbackReplicaStateStore, InMemoryStorage};
use utils::pipe::{self, DispatcherPipe};

/// This creates a mock Consensus struct for unit tests.
Expand All @@ -18,13 +17,8 @@ pub async fn make_consensus(
validator_set: &validator::ValidatorSet,
genesis_block: &validator::FinalBlock,
) -> (Consensus, DispatcherPipe<InputMessage, OutputMessage>) {
// Create a temporary folder.
let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("block_store");
// Initialize the storage.
let storage = RocksdbStorage::new(ctx, genesis_block, &temp_file)
.await
.unwrap();
let storage = InMemoryStorage::new(genesis_block.clone());
// Create the pipe.
let (consensus_pipe, dispatcher_pipe) = pipe::new();

Expand Down
8 changes: 2 additions & 6 deletions node/actors/consensus/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use storage::{FallbackReplicaStateStore, InMemoryStorage};
use tracing::Instrument as _;
use utils::pipe;

Expand Down Expand Up @@ -101,11 +101,7 @@ async fn run_nodes(
network_pipes.insert(validator_key.public(), network_actor_pipe);
s.spawn(
async {
let dir = tempfile::tempdir().context("tempdir()")?;
let storage =
RocksdbStorage::new(ctx, &genesis_block, &dir.path().join("storage"))
.await
.context("RocksdbStorage")?;
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = FallbackReplicaStateStore::from_store(Arc::new(storage));

let consensus = Consensus::new(
Expand Down
1 change: 0 additions & 1 deletion node/actors/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@ sync_blocks = { path = "../sync_blocks" }

[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
tokio.workspace = true
26 changes: 11 additions & 15 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use network::testonly::Instance;
use rand::Rng;
use roles::validator::{BlockNumber, Payload};
use std::collections::HashMap;
use storage::{BlockStore, RocksdbStorage, StorageError};
use storage::{BlockStore, InMemoryStorage, StorageError};

async fn run_executor(ctx: &ctx::Ctx, executor: Executor<RocksdbStorage>) -> anyhow::Result<()> {
async fn run_executor(ctx: &ctx::Ctx, executor: Executor<InMemoryStorage>) -> anyhow::Result<()> {
executor.run(ctx).await.or_else(|err| {
if err.root_cause().is::<ctx::Canceled>() {
Ok(()) // Test has successfully finished
Expand All @@ -22,7 +22,7 @@ async fn run_executor(ctx: &ctx::Ctx, executor: Executor<RocksdbStorage>) -> any
async fn store_final_blocks(
ctx: &ctx::Ctx,
mut blocks_receiver: channel::UnboundedReceiver<FinalBlock>,
storage: Arc<RocksdbStorage>,
storage: Arc<InMemoryStorage>,
) -> anyhow::Result<()> {
while let Ok(block) = blocks_receiver.recv(ctx).await {
tracing::trace!(number = %block.header.number, "Finalized new block");
Expand Down Expand Up @@ -73,9 +73,9 @@ impl FullValidatorConfig {

fn into_executor(
self,
storage: Arc<RocksdbStorage>,
storage: Arc<InMemoryStorage>,
) -> (
Executor<RocksdbStorage>,
Executor<InMemoryStorage>,
channel::UnboundedReceiver<FinalBlock>,
) {
let (blocks_sender, blocks_receiver) = channel::unbounded();
Expand All @@ -100,9 +100,8 @@ async fn executing_single_validator() {

let validator = FullValidatorConfig::for_single_validator(rng);
let genesis_block = &validator.node_config.genesis_block;
let temp_dir = tempfile::tempdir().unwrap();
let storage = RocksdbStorage::new(ctx, genesis_block, temp_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = Arc::new(storage);
let (executor, mut blocks_receiver) = validator.into_executor(storage);

scope::run!(ctx, |ctx, s| async {
Expand Down Expand Up @@ -151,13 +150,10 @@ async fn executing_validator_and_external_node() {
.insert(external_node_key.public());

let genesis_block = &validator.node_config.genesis_block;
let temp_dir = tempfile::tempdir().unwrap();
let validator_storage =
RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("validator")).await;
let validator_storage = Arc::new(validator_storage.unwrap());
let external_node_storage =
RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("en")).await;
let external_node_storage = Arc::new(external_node_storage.unwrap());
let validator_storage = InMemoryStorage::new(genesis_block.clone());
let validator_storage = Arc::new(validator_storage);
let external_node_storage = InMemoryStorage::new(genesis_block.clone());
let external_node_storage = Arc::new(external_node_storage);
let mut en_subscriber = external_node_storage.subscribe_to_block_writes();

let (validator, blocks_receiver) = validator.into_executor(validator_storage.clone());
Expand Down
1 change: 0 additions & 1 deletion node/actors/sync_blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ network = { path = "../network" }
assert_matches.workspace = true
async-trait.workspace = true
rand.workspace = true
tempfile.workspace = true
test-casing.workspace = true
tokio.workspace = true
18 changes: 7 additions & 11 deletions node/actors/sync_blocks/src/peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use concurrency::time;
use rand::{rngs::StdRng, seq::IteratorRandom, Rng};
use roles::validator;
use std::{collections::HashSet, fmt};
use storage::RocksdbStorage;
use storage::InMemoryStorage;
use test_casing::{test_casing, Product};

const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5);
Expand Down Expand Up @@ -64,15 +64,13 @@ async fn wait_for_stored_block(
async fn test_peer_states<T: Test>(test: T) {
concurrency::testonly::abort_on_panic();

let storage_dir = tempfile::tempdir().unwrap();

let ctx = &ctx::test_root(&ctx::RealClock).with_timeout(TEST_TIMEOUT);
let clock = ctx::ManualClock::new();
let ctx = &ctx::test_with_clock(ctx, &clock);
let mut rng = ctx.rng();
let test_validators = TestValidators::new(4, T::BLOCK_COUNT, &mut rng);
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);
test.initialize_storage(ctx, storage.as_ref(), &test_validators)
.await;

Expand Down Expand Up @@ -814,12 +812,11 @@ async fn requesting_blocks_with_unreliable_peers(

#[tokio::test]
async fn processing_invalid_sync_states() {
let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let test_validators = TestValidators::new(4, 3, rng);
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);

let (message_sender, _) = channel::unbounded();
let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config());
Expand Down Expand Up @@ -849,12 +846,11 @@ async fn processing_invalid_sync_states() {

#[tokio::test]
async fn processing_invalid_blocks() {
let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let test_validators = TestValidators::new(4, 3, rng);
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);

let (message_sender, _) = channel::unbounded();
let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config());
Expand Down
27 changes: 8 additions & 19 deletions node/actors/sync_blocks/src/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use concurrency::ctx::channel;
use network::testonly::Instance as NetworkInstance;
use rand::seq::SliceRandom;
use roles::node;
use std::{fmt, path::Path};
use storage::RocksdbStorage;
use std::fmt;
use storage::InMemoryStorage;
use test_casing::test_casing;
use tracing::Instrument;

Expand Down Expand Up @@ -99,18 +99,13 @@ impl Node {
self.network.gossip_config().key.public()
}

#[instrument(level = "trace", skip(ctx, test_validators, storage_dir), err)]
async fn run(
mut self,
ctx: &ctx::Ctx,
test_validators: &TestValidators,
storage_dir: &Path,
) -> anyhow::Result<()> {
#[instrument(level = "trace", skip(ctx, test_validators), err)]
async fn run(mut self, ctx: &ctx::Ctx, test_validators: &TestValidators) -> anyhow::Result<()> {
let key = self.key();
let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new();
let (network_actor_pipe, network_dispatcher_pipe) = pipe::new();
let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir);
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone());
let storage = Arc::new(storage);

let sync_blocks_config = test_validators.test_config();
let sync_blocks = SyncBlocks::new(
Expand Down Expand Up @@ -235,23 +230,17 @@ async fn test_sync_blocks<T: GossipNetworkTest>(test: T) {

concurrency::testonly::abort_on_panic();

let temp_dir = tempfile::TempDir::new().unwrap();
let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64))
.with_timeout(TEST_TIMEOUT * CLOCK_SPEEDUP);
let (node_count, gossip_peers) = test.network_params();
let (network, nodes) = GossipNetwork::new(&mut ctx.rng(), node_count, gossip_peers);
scope::run!(ctx, |ctx, s| async {
for (i, node) in nodes.into_iter().enumerate() {
for node in nodes {
let test_validators = network.test_validators.clone();
let storage_dir = temp_dir.path().join(i.to_string());
s.spawn_bg(async {
let test_validators = test_validators;
let storage_dir = storage_dir;
let key = node.key();
let err = node
.run(ctx, &test_validators, &storage_dir)
.await
.unwrap_err();
let err = node.run(ctx, &test_validators).await.unwrap_err();

tracing::trace!(?key, "Node task completed");
if err.root_cause().is::<ctx::Canceled>() {
Expand Down
12 changes: 4 additions & 8 deletions node/actors/sync_blocks/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use roles::validator::{
BlockHeader, BlockNumber, CommitQC, FinalBlock, Payload, ValidatorSet,
};
use std::iter;
use storage::RocksdbStorage;
use storage::InMemoryStorage;
use utils::pipe;

mod end_to_end;
Expand Down Expand Up @@ -106,17 +106,14 @@ impl TestValidators {
async fn subscribing_to_state_updates() {
concurrency::testonly::abort_on_panic();

let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let genesis_block = make_genesis_block(rng);
let block_1 = make_block(rng, &genesis_block.header);
let block_2 = make_block(rng, &block_1.header);
let block_3 = make_block(rng, &block_2.header);

let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path())
.await
.unwrap();
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = &Arc::new(storage);
let (actor_pipe, _dispatcher_pipe) = pipe::new();
let actor = SyncBlocks::new(ctx, actor_pipe, storage.clone(), rng.gen())
Expand Down Expand Up @@ -184,13 +181,12 @@ async fn subscribing_to_state_updates() {
async fn getting_blocks() {
concurrency::testonly::abort_on_panic();

let storage_dir = tempfile::tempdir().unwrap();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let genesis_block = make_genesis_block(rng);

let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path());
let storage = Arc::new(storage.await.unwrap());
let storage = InMemoryStorage::new(genesis_block.clone());
let storage = Arc::new(storage);
let blocks = iter::successors(Some(genesis_block), |parent| {
Some(make_block(rng, &parent.header))
});
Expand Down
7 changes: 6 additions & 1 deletion node/libs/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license.workspace = true
anyhow.workspace = true
async-trait.workspace = true
rand.workspace = true
rocksdb.workspace = true
rocksdb = { workspace = true, optional = true }
thiserror.workspace = true
tracing.workspace = true

Expand All @@ -23,3 +23,8 @@ assert_matches.workspace = true
tempfile.workspace = true
test-casing.workspace = true
tokio.workspace = true

[features]
default = []
# Enables RocksDB-based storage.
rocksdb = ["dep:rocksdb"]
Loading
Loading