Skip to content

Commit

Permalink
Reorganize storage crate modules
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Oct 24, 2023
1 parent 88b2932 commit 3584010
Show file tree
Hide file tree
Showing 7 changed files with 435 additions and 438 deletions.
24 changes: 2 additions & 22 deletions node/libs/storage/src/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Buffered [`BlockStore`] implementation.
use crate::{
block_store::{BlockStore, MissingBlockNumbers, WriteBlockStore},
traits::{BlockStore, ContiguousBlockStore, WriteBlockStore},
types::MissingBlockNumbers,
StorageError, StorageResult,
};
use async_trait::async_trait;
Expand All @@ -14,27 +15,6 @@ use concurrency::{
use roles::validator::{BlockNumber, FinalBlock};
use std::{collections::BTreeMap, ops};

/// [`BlockStore`] variation that upholds additional invariants as to how blocks are processed.
///
/// The invariants are as follows:
///
/// - Stored blocks always have contiguous numbers; there are no gaps.
/// - Blocks can be scheduled to be added using [`Self::schedule_next_block()`] only. New blocks do not
/// appear in the store otherwise.
#[async_trait]
pub trait ContiguousBlockStore: BlockStore {
/// Schedules a block to be added to the store. Unlike [`WriteBlockStore::put_block()`],
/// there is no expectation that the block is added to the store *immediately*. It's
/// expected that it will be added to the store eventually, which will be signaled via
/// a subscriber returned from [`BlockStore::subscribe_to_block_writes()`].
///
/// [`BufferedStorage`] guarantees that this method will only ever be called:
///
/// - with the next block (i.e., one immediately after [`BlockStore::head_block()`])
/// - sequentially (i.e., multiple blocks cannot be scheduled at once)
async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()>;
}

#[derive(Debug)]
struct BlockBuffer {
store_block_number: BlockNumber,
Expand Down
115 changes: 6 additions & 109 deletions node/libs/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,120 +1,17 @@
//! This module is responsible for persistent data storage, it provides schema-aware type-safe database access. Currently we use RocksDB,
//! but this crate only exposes an abstraction of a database, so we can easily switch to a different storage engine in the future.
use anyhow::Context as _;
use concurrency::{ctx, scope, sync::watch};
use roles::validator::{self, BlockNumber};
use std::{
fmt, ops,
path::Path,
sync::{atomic::AtomicU64, RwLock},
};
use thiserror::Error;

mod block_store;
mod buffered;
mod replica;
mod rocksdb;
mod testonly;
#[cfg(test)]
mod tests;
mod traits;
mod types;

pub use crate::{
block_store::{BlockStore, WriteBlockStore},
buffered::{BufferedStorage, ContiguousBlockStore},
replica::ReplicaStateStore,
types::ReplicaState,
buffered::BufferedStorage,
rocksdb::RocksdbStorage,
traits::{BlockStore, ContiguousBlockStore, ReplicaStateStore, WriteBlockStore},
types::{ReplicaState, StorageError, StorageResult},
};

/// Storage errors.
#[derive(Debug, Error)]
pub enum StorageError {
/// Operation was canceled by structured concurrency framework.
#[error("operation was canceled by structured concurrency framework")]
Canceled(#[from] ctx::Canceled),
/// Database operation failed.
#[error("database operation failed")]
Database(#[source] anyhow::Error),
}

/// [`Result`] for fallible storage operations.
pub type StorageResult<T> = Result<T, StorageError>;

/// Main struct for the Storage module, it just contains the database. Provides a set of high-level
/// atomic operations on the database. It "contains" the following data:
///
/// - An append-only database of finalized blocks.
/// - A backup of the consensus replica state.
pub struct RocksdbStorage {
/// Wrapped RocksDB instance. We don't need `RwLock` for synchronization *per se*, just to ensure
/// that writes to the DB are linearized.
inner: RwLock<rocksdb::DB>,
/// In-memory cache for the last contiguous block number stored in the DB. The cache is used
/// and updated by `Self::get_last_contiguous_block_number()`. Caching is based on the assumption
/// that blocks are never removed from the DB.
cached_last_contiguous_block_number: AtomicU64,
/// Sender of numbers of written blocks.
block_writes_sender: watch::Sender<BlockNumber>,
}

impl RocksdbStorage {
/// Create a new Storage. It first tries to open an existing database, and if that fails it just creates a
/// a new one. We need the genesis block of the chain as input.
// TODO(bruno): we want to eventually start pruning old blocks, so having the genesis
// block might be unnecessary.
pub async fn new(
ctx: &ctx::Ctx,
genesis_block: &validator::FinalBlock,
path: &Path,
) -> StorageResult<Self> {
let mut options = rocksdb::Options::default();
options.create_missing_column_families(true);
options.create_if_missing(true);

let db = scope::run!(ctx, |_, s| async {
Ok(s.spawn_blocking(|| {
rocksdb::DB::open(&options, path)
.context("Failed opening RocksDB")
.map_err(StorageError::Database)
})
.join(ctx)
.await?)
})
.await?;

let this = Self {
inner: RwLock::new(db),
cached_last_contiguous_block_number: AtomicU64::new(0),
block_writes_sender: watch::channel(genesis_block.block.number).0,
};
if let Some(stored_genesis_block) = this.block(ctx, genesis_block.block.number).await? {
if stored_genesis_block.block != genesis_block.block {
let err = anyhow::anyhow!("Mismatch between stored and expected genesis block");
return Err(StorageError::Database(err));
}
} else {
tracing::debug!(
"Genesis block not present in RocksDB at `{path}`; saving {genesis_block:?}",
path = path.display()
);
this.put_block(ctx, genesis_block).await?;
}
Ok(this)
}

/// Acquires a read lock on the underlying DB.
fn read(&self) -> impl ops::Deref<Target = rocksdb::DB> + '_ {
self.inner.read().expect("DB lock is poisoned")
}

/// Acquires a write lock on the underlying DB.
fn write(&self) -> impl ops::Deref<Target = rocksdb::DB> + '_ {
self.inner.write().expect("DB lock is poisoned")
}
}

impl fmt::Debug for RocksdbStorage {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("RocksdbStorage")
}
}
96 changes: 0 additions & 96 deletions node/libs/storage/src/replica.rs

This file was deleted.

Loading

0 comments on commit 3584010

Please sign in to comment.